package org.opendaylight.openflowplugin.api.openflow.device;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
/**
* The central entity of OFP is the Device Context, which encapsulate the logical state of a switch
*/
ItemLifeCycleRegistry getItemLifeCycleSourceRegistry();
- /**
- * Setter for sal role service.
- * @param salRoleService role service
- */
- void setSalRoleService(@Nonnull SalRoleService salRoleService);
-
- /**
- * Make device slave.
- * @return listenable future from sal role service
- */
- ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave();
-
/**
* Checks if device and controller supports single layer serialization.
* @return true if single layer serialization is supported
*/
void registerServices(ClusterSingletonServiceProvider clusterSingletonServiceProvider);
- /**
- * After connect of device make this device SLAVE.
- */
- void makeDeviceSlave();
-
/**
* Check all needed to be master.
* @param mastershipState - state master on device, initial gather, initial submit, initial registry fill
* @param deviceRemovedHandler device removed handler
*/
void registerDeviceRemovedHandler(@Nonnull DeviceRemovedHandler deviceRemovedHandler);
-}
+}
\ No newline at end of file
/**
* Change to SLAVE role on device was not able.
* @param deviceInfo connected switch identification
+ * @param reason reason
*/
- void onSlaveRoleNotAcquired(DeviceInfo deviceInfo);
+ void onSlaveRoleNotAcquired(DeviceInfo deviceInfo, String reason);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.role;
+
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+
+/**
+ * Handles propagation of SLAVE and MASTER roles on connected devices.
+ */
+public interface RoleContext extends OFPContext, RequestContextStack {
+ /**
+ * Sets role service.
+ *
+ * @param salRoleService the sal role service
+ */
+ void setRoleService(SalRoleService salRoleService);
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.role;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowplugin.api.openflow.OFPManager;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+
+/**
+ * Manages creation and termination of role contexts.
+ * @see org.opendaylight.openflowplugin.api.openflow.role.RoleContext
+ */
+public interface RoleManager extends OFPManager {
+ /**
+ * Create role context.
+ *
+ * @param deviceContext the device context
+ * @return the role context
+ */
+ RoleContext createContext(@Nonnull DeviceContext deviceContext);
+}
\ No newline at end of file
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
+import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
private DeviceManager deviceManager;
private RpcManager rpcManager;
private StatisticsManager statisticsManager;
+ private RoleManager roleManager;
private ConnectionManager connectionManager;
private ThreadPoolExecutor threadPool;
private ContextChainHolderImpl contextChainHolder;
hashedWheelTimer,
convertorManager);
+ roleManager = new RoleManagerImpl(hashedWheelTimer);
+
contextChainHolder = new ContextChainHolderImpl(
- hashedWheelTimer,
threadPool,
singletonServicesProvider,
entityOwnershipService,
- mastershipChangeServiceManager
- );
+ mastershipChangeServiceManager);
statisticsManager.setReconciliationFrameworkRegistrar(mastershipChangeServiceManager);
contextChainHolder.addManager(deviceManager);
contextChainHolder.addManager(statisticsManager);
contextChainHolder.addManager(rpcManager);
+ contextChainHolder.addManager(roleManager);
connectionManager = new ConnectionManagerImpl(config, threadPool);
connectionManager.setDeviceConnectedHandler(contextChainHolder);
gracefulShutdown(deviceManager);
gracefulShutdown(rpcManager);
gracefulShutdown(statisticsManager);
+ gracefulShutdown(roleManager);
gracefulShutdown(threadPool);
gracefulShutdown(hashedWheelTimer);
unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.HashedWheelTimer;
-import io.netty.util.TimerTask;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO: high water mark factor should be parametrized
private static final float HIGH_WATERMARK_FACTOR = 0.95f;
- // Timeout in seconds after what we will give up on propagating role
- private static final int SET_ROLE_TIMEOUT = 10;
-
// Timeout in milliseconds after what we will give up on initializing device
private static final int DEVICE_INIT_TIMEOUT = 9000;
private SalRoleService salRoleService;
private ContextChainMastershipWatcher contextChainMastershipWatcher;
- DeviceContextImpl(
- @Nonnull final ConnectionContext primaryConnectionContext,
- @Nonnull final DataBroker dataBroker,
- @Nonnull final MessageSpy messageSpy,
- @Nonnull final TranslatorLibrary translatorLibrary,
- final ConvertorExecutor convertorExecutor,
- final boolean skipTableFeatures,
- final HashedWheelTimer hashedWheelTimer,
- final boolean useSingleLayerSerialization,
- final DeviceInitializerProvider deviceInitializerProvider,
- final boolean isFlowRemovedNotificationOn,
- final boolean switchFeaturesMandatory) {
+ DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
+ @Nonnull final DataBroker dataBroker,
+ @Nonnull final MessageSpy messageSpy,
+ @Nonnull final TranslatorLibrary translatorLibrary,
+ final ConvertorExecutor convertorExecutor,
+ final boolean skipTableFeatures,
+ final HashedWheelTimer hashedWheelTimer,
+ final boolean useSingleLayerSerialization,
+ final DeviceInitializerProvider deviceInitializerProvider,
+ final boolean isFlowRemovedNotificationOn,
+ final boolean switchFeaturesMandatory) {
this.primaryConnectionContext = primaryConnectionContext;
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
}
- @Override
- public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
- this.salRoleService = salRoleService;
- }
-
@Override
public void instantiateServiceInstance() {
lazyTransactionManagerInitialization();
deviceInfo.toString()));
}
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
- new RpcResultFutureCallback(contextChainMastershipWatcher));
-
final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
Futures.addCallback(deviceFlowRegistryFill,
new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
return abstractRequestContext;
}
- private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
- }
-
- final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
-
- if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
- final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
- .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
-
- setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
-
- final TimerTask timerTask = timeout -> {
- if (!setRoleOutputFuture.isDone()) {
- LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
- setRoleOutputFuture.cancel(true);
- }
- };
-
- hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
- } else {
- LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
- return Futures.immediateFuture(null);
- }
-
- return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
- }
-
- @Override
- public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
- return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
- }
-
@Override
public void onStateAcquired(final ContextChainState state) {
hasState.set(true);
}
- private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
-
- private final ContextChainMastershipWatcher contextChainMastershipWatcher;
-
- RpcResultFutureCallback(final ContextChainMastershipWatcher contextChainMastershipWatcher) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
- }
-
- @Override
- public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- this.contextChainMastershipWatcher.onMasterRoleAcquired(
- deviceInfo,
- ContextChainMastershipState.MASTER_ON_DEVICE
- );
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
- }
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
- deviceInfo,
- "Was not able to set MASTER role on device");
- }
- }
-
private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base.Optional<FlowCapableNode>>> {
private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
private final ContextChainMastershipWatcher contextChainMastershipWatcher;
import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
-import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtil;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
config.isEnableFlowRemovedNotification(),
config.isSwitchFeaturesMandatory());
- deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
deviceContext.setNotificationPublishService(notificationPublishService);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.HashedWheelTimer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
-import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
- private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
- private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
private final Map<DeviceInfo, ContextChain> contextChainMap = new ConcurrentHashMap<>();
private final EntityOwnershipListenerRegistration eosListenerRegistration;
private final ClusterSingletonServiceProvider singletonServiceProvider;
- private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
private final ExecutorService executorService;
private final OwnershipChangeListener ownershipChangeListener;
private DeviceManager deviceManager;
private RpcManager rpcManager;
private StatisticsManager statisticsManager;
+ private RoleManager roleManager;
- public ContextChainHolderImpl(final HashedWheelTimer timer,
- final ExecutorService executorService,
+ public ContextChainHolderImpl(final ExecutorService executorService,
final ClusterSingletonServiceProvider singletonServiceProvider,
final EntityOwnershipService entityOwnershipService,
final OwnershipChangeListener ownershipChangeListener) {
this.ownershipChangeListener.setMasterChecker(this);
this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
.registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
-
- this.scheduler = new ItemScheduler<>(
- timer,
- CHECK_ROLE_MASTER_TIMEOUT,
- CHECK_ROLE_MASTER_TOLERANCE,
- ContextChain::makeDeviceSlave);
}
@Override
} else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
LOG.trace("Context chain holder: Statistics manager OK.");
statisticsManager = (StatisticsManager) manager;
+ } else if (Objects.isNull(roleManager) && manager instanceof RoleManager) {
+ LOG.trace("Context chain holder: Role manager OK.");
+ roleManager = (RoleManager) manager;
}
}
statisticsContext.registerMastershipWatcher(this);
LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
+ final RoleContext roleContext = roleManager.createContext(deviceContext);
+ roleContext.registerMastershipWatcher(this);
+ LOG.debug("Role" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
+
final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
executorService);
contextChain.registerDeviceRemovedHandler(deviceManager);
contextChain.registerDeviceRemovedHandler(rpcManager);
contextChain.registerDeviceRemovedHandler(statisticsManager);
+ contextChain.registerDeviceRemovedHandler(roleManager);
contextChain.registerDeviceRemovedHandler(this);
contextChain.addContext(deviceContext);
contextChain.addContext(rpcContext);
contextChain.addContext(statisticsContext);
+ contextChain.addContext(roleContext);
contextChainMap.put(deviceInfo, contextChain);
LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
deviceContext.onPublished();
- scheduler.add(deviceInfo, contextChain);
- scheduler.startIfNotRunning();
- LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
- deviceInfo,
- CHECK_ROLE_MASTER_TIMEOUT / 1000L);
-
contextChain.registerServices(singletonServiceProvider);
}
@Override
public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
@Nonnull final ContextChainMastershipState mastershipState) {
- scheduler.remove(deviceInfo);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
if (ownershipChangeListener.isReconciliationFrameworkRegistered()) {
if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) {
@Override
public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
- scheduler.remove(deviceInfo);
ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
+ LOG.info("Role SLAVE was granted to device {}", deviceInfo);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
}
@Override
- public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
+ public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo, final String reason) {
+ LOG.warn("Not able to set SLAVE role on device {}, reason: {}", deviceInfo, reason);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
}
@VisibleForTesting
boolean checkAllManagers() {
- return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
+ return Objects.nonNull(deviceManager)
+ && Objects.nonNull(rpcManager)
+ && Objects.nonNull(statisticsManager)
+ && Objects.nonNull(roleManager);
}
@Override
public void close() throws Exception {
- scheduler.close();
Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
copyOfChains.keySet().forEach(this::destroyContextChain);
copyOfChains.clear();
}
private void destroyContextChain(final DeviceInfo deviceInfo) {
- scheduler.remove(deviceInfo);
-
+ ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
contextChain.close();
@Override
public void onDeviceRemoved(final DeviceInfo deviceInfo) {
- scheduler.remove(deviceInfo);
contextChainMap.remove(deviceInfo);
LOG.debug("Context chain removed for node {}", deviceInfo);
}
package org.opendaylight.openflowplugin.impl.lifecycle;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
contextChainState.set(ContextChainState.CLOSED);
- contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
+ unMasterMe();
// Close all connections to devices
auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
LOG.info("Registered clustering services for node {}", deviceInfo);
}
- @Override
- public void makeDeviceSlave() {
- unMasterMe();
-
- contexts.forEach(context -> {
- if (context.map(DeviceContext.class::isInstance)) {
- Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
- new DeviceSlaveCallback(), executorService);
- }
- });
- }
-
@Override
public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) {
switch (mastershipState) {
masterStateOnDevice.set(false);
rpcRegistration.set(false);
}
-
- private final class DeviceSlaveCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
- @Override
- public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
- contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable t) {
- contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo);
- }
- }
-}
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.role;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RoleContextImpl implements RoleContext {
+ private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
+
+ // Timeout after what we will give up on propagating role
+ private static final long SET_ROLE_TIMEOUT = 10000;
+
+ private final DeviceInfo deviceInfo;
+ private final HashedWheelTimer timer;
+ private final AtomicReference<ListenableFuture<RpcResult<SetRoleOutput>>> lastRoleFuture = new AtomicReference<>();
+ private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
+ private final Timeout slaveTask;
+ private ContextChainMastershipWatcher contextChainMastershipWatcher;
+ private SalRoleService roleService;
+
+ RoleContextImpl(@Nonnull final DeviceInfo deviceInfo,
+ @Nonnull final HashedWheelTimer timer,
+ final long checkRoleMasterTimeout) {
+ this.deviceInfo = deviceInfo;
+ this.timer = timer;
+ slaveTask = timer.newTimeout((t) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS);
+
+ LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.",
+ deviceInfo,
+ checkRoleMasterTimeout / 1000L);
+ }
+
+ @Override
+ public DeviceInfo getDeviceInfo() {
+ return deviceInfo;
+ }
+
+ @Override
+ public void setRoleService(final SalRoleService salRoleService) {
+ roleService = salRoleService;
+ }
+
+ @Override
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
+ this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+ }
+
+ @Override
+ public void close() {
+ slaveTask.cancel();
+ changeLastRoleFuture(null);
+ requestContexts.forEach(requestContext -> RequestContextUtil
+ .closeRequestContextWithRpcError(requestContext, "Connection closed."));
+ requestContexts.clear();
+ }
+
+ @Override
+ public void instantiateServiceInstance() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public ListenableFuture<Void> closeServiceInstance() {
+ changeLastRoleFuture(null);
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
+ @Override
+ public void close() {
+ requestContexts.remove(this);
+ }
+ };
+
+ requestContexts.add(ret);
+ return ret;
+ }
+
+ @Nonnull
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return deviceInfo.getServiceIdentifier();
+ }
+
+ private void changeLastRoleFuture(final ListenableFuture<RpcResult<SetRoleOutput>> newFuture) {
+ lastRoleFuture.getAndUpdate(lastFuture -> {
+ if (Objects.nonNull(lastFuture) && !lastFuture.isCancelled() && !lastFuture.isDone()) {
+ lastFuture.cancel(true);
+ }
+
+ return newFuture;
+ });
+ }
+
+ private ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor());
+ return future;
+ }
+
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo);
+
+ if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
+ final SetRoleInput setRoleInput = new SetRoleInputBuilder()
+ .setControllerRole(newRole)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
+ .build();
+
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = roleService.setRole(setRoleInput);
+
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during {} sec", newRole,
+ deviceInfo, SET_ROLE_TIMEOUT);
+ setRoleOutputFuture.cancel(true);
+ }
+ };
+
+ timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS);
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
+ }
+
+ LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole);
+ return Futures.immediateFuture(null);
+ }
+
+ private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ slaveTask.cancel();
+ contextChainMastershipWatcher.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE);
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ slaveTask.cancel();
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to propagate MASTER role on device. Error: " + throwable.toString());
+ }
+ }
+
+ private final class SlaveRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
+ slaveTask.cancel();
+ contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
+ LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ slaveTask.cancel();
+ contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo,
+ "Was not able to propagate SLAVE role on device. Error: " + throwable.toString());
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.role;
+
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
+
+public class RoleManagerImpl implements RoleManager {
+ // Timeout after what we will give up on waiting for master role
+ private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000;
+
+ private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
+ private final HashedWheelTimer timer;
+
+ public RoleManagerImpl(final HashedWheelTimer timer) {
+ this.timer = timer;
+ }
+
+ @Override
+ public RoleContext createContext(@Nonnull final DeviceContext deviceContext) {
+ final DeviceInfo deviceInfo = deviceContext.getDeviceInfo();
+ final RoleContextImpl roleContext = new RoleContextImpl(
+ deviceContext.getDeviceInfo(),
+ timer, CHECK_ROLE_MASTER_TIMEOUT);
+
+ roleContext.setRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
+ contexts.put(deviceInfo, roleContext);
+ return roleContext;
+ }
+
+ @Override
+ public void onDeviceRemoved(final DeviceInfo deviceInfo) {
+ contexts.remove(deviceInfo);
+ }
+
+ @Override
+ public void close() {
+ contexts.values().forEach(OFPContext::close);
+ contexts.clear();
+ }
+}
\ No newline at end of file
package org.opendaylight.openflowplugin.impl.services.sal;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.impl.role.RoleChangeException;
import org.opendaylight.openflowplugin.impl.services.AbstractSimpleService;
import org.opendaylight.openflowplugin.impl.services.RoleService;
import org.opendaylight.openflowplugin.impl.services.util.ServiceException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService {
-
private static final Logger LOG = LoggerFactory.getLogger(SalRoleServiceImpl.class);
-
private static final BigInteger MAX_GENERATION_ID = new BigInteger("ffffffffffffffff", 16);
- private static final int MAX_RETRIES = 42;
-
- private static final String ROLE_REQUEST_UNSUPPORTED = ErrorType.ROLEREQUESTFAILED.name().concat(" code UNSUP");
-
private final DeviceContext deviceContext;
private final RoleService roleService;
public Future<RpcResult<SetRoleOutput>> setRole(final SetRoleInput input) {
LOG.info("SetRole called with input:{}", input);
- final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.create();
- repeaterForChangeRole(resultFuture, input, 0);
- /* Add Callback for release Guard */
- Futures.addCallback(resultFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
-
- @Override
- public void onSuccess(final RpcResult<SetRoleOutput> result) {
- LOG.debug("SetRoleService for Node: {} is ok Role: {}", input.getNode().getValue(),
- input.getControllerRole());
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("SetRoleService set Role {} for Node: {} fail . Reason {}", input.getControllerRole(),
- input.getNode().getValue(), t);
- }
- });
- return resultFuture;
- }
-
- private void repeaterForChangeRole(final SettableFuture<RpcResult<SetRoleOutput>> future, final SetRoleInput input,
- final int retryCounter) {
- if (future.isCancelled()) {
- future.setException(new RoleChangeException(String.format(
- "Set Role for device %s stop because Future was canceled", input.getNode().getValue())));
- return;
- }
- if (retryCounter >= MAX_RETRIES) {
- future.setException(new RoleChangeException(String.format("Set Role failed after %s tries on device %s",
- MAX_RETRIES, input.getNode().getValue())));
- return;
- }
// Check current connection state
final CONNECTION_STATE state = deviceContext.getPrimaryConnectionContext().getConnectionState();
switch (state) {
- case RIP:
- LOG.info("Device {} has been disconnected", input.getNode());
- future.setException(new Exception(String.format(
- "Device connection doesn't exist anymore. Primary connection status : %s", state)));
- return;
- case WORKING:
- // We can proceed
- LOG.trace("Device {} has been working", input.getNode());
- break;
- default:
- LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state);
- future.setException(new Exception(String.format("Unexcpected device connection status : %s", state)));
- return;
+ case RIP:
+ LOG.info("Device {} has been disconnected", input.getNode());
+ return Futures.immediateFailedFuture(new Exception(String
+ .format("Device connection doesn't exist anymore. Primary connection status : %s",
+ state)));
+ case WORKING:
+ // We can proceed
+ LOG.trace("Device {} has been working", input.getNode());
+ break;
+ default:
+ LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state);
+ return Futures.immediateFailedFuture(new Exception(String
+ .format("Unexpected device connection status : %s", state)));
}
LOG.info("Requesting state change to {}", input.getControllerRole());
- final ListenableFuture<RpcResult<SetRoleOutput>> changeRoleFuture = tryToChangeRole(input.getControllerRole());
- Futures.addCallback(changeRoleFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
-
- @Override
- public void onSuccess(final RpcResult<SetRoleOutput> result) {
- if (result.isSuccessful()) {
- LOG.debug("setRoleOutput received after roleChangeTask execution:{}", result);
- future.set(RpcResultBuilder.<SetRoleOutput> success().withResult(result.getResult()).build());
- } else {
- final boolean present = result
- .getErrors()
- .stream()
- .anyMatch(rpcError -> (rpcError.getMessage().contains(ROLE_REQUEST_UNSUPPORTED)));
-
- if (!present) {
- LOG.warn("setRole() failed with errors, will retry: {} times.", MAX_RETRIES - retryCounter);
- repeaterForChangeRole(future, input, (retryCounter + 1));
- } else {
- LOG.warn("setRole() failed with error - role request unsupported.");
- future.set(result);
- }
- }
- }
-
- @Override
- public void onFailure(final Throwable t) {
- if (!t.getMessage().contains(ROLE_REQUEST_UNSUPPORTED)) {
- LOG.warn("Exception in setRole(), will retry: {} times.", t, MAX_RETRIES - retryCounter);
- repeaterForChangeRole(future, input, (retryCounter + 1));
- } else {
- LOG.warn("Exception in setRole() - role request unsupported.", t);
- future.set(RpcResultBuilder.<SetRoleOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, t.getMessage()).build());
- }
- }
- });
+ return tryToChangeRole(input.getControllerRole());
}
private ListenableFuture<RpcResult<SetRoleOutput>> tryToChangeRole(final OfpRole role) {
final Future<BigInteger> generationFuture = roleService.getGenerationIdFromDevice(getVersion());
- return Futures.transformAsync(JdkFutureAdapters.listenInPoolThread(generationFuture), (AsyncFunction<BigInteger, RpcResult<SetRoleOutput>>) generationId -> {
+ return Futures.transformAsync(JdkFutureAdapters.listenInPoolThread(generationFuture), generationId -> {
LOG.debug("RoleChangeTask, GenerationIdFromDevice from device {} is {}", getDeviceInfo().getNodeId().getValue(), generationId);
final BigInteger nextGenerationId = getNextGenerationId(generationId);
LOG.debug("nextGenerationId received from device:{} is {}", getDeviceInfo().getNodeId().getValue(), nextGenerationId);
return BigInteger.ZERO;
}
}
-}
+}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.util;
-
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ItemScheduler<K, V> implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(ItemScheduler.class);
-
- private final HashedWheelTimer hashedWheelTimer;
- private final Consumer<V> action;
- private final long timeoutMillis;
- private final long toleranceMillis;
- private final Map<K, V> items = Collections.synchronizedMap(new HashMap<>());
- private final Map<K, V> queue = Collections.synchronizedMap(new HashMap<>());
- private final Object scheduleLock = new Object();
-
- private volatile long startTime = -1;
- private volatile Timeout runningTimeout;
-
- /**
- * Instantiates a new Item scheduler.
- *
- * @param hashedWheelTimer the hashed wheel timer
- * @param timeoutMillis the timeout millis
- * @param toleranceMillis the tolerance millis
- * @param action the action
- */
- public ItemScheduler(final HashedWheelTimer hashedWheelTimer,
- final long timeoutMillis,
- final long toleranceMillis,
- final Consumer<V> action) {
- this.hashedWheelTimer = hashedWheelTimer;
- this.action = action;
- this.timeoutMillis = timeoutMillis;
- this.toleranceMillis = toleranceMillis;
- }
-
- /**
- * Start scheduler timeout if it is not already running and if there are any items scheduled
- */
- public void startIfNotRunning() {
- synchronized (scheduleLock) {
- if (Objects.nonNull(runningTimeout) || (items.isEmpty() && queue.isEmpty())) {
- LOG.debug("Scheduler {} is already running or nothing is scheduled, skipping start.", this);
- return;
- }
-
- startTime = System.currentTimeMillis();
- LOG.debug("Scheduler {} started with configured timeout {}ms and scheduling tolerance {}ms.",
- this, timeoutMillis, toleranceMillis);
-
- runningTimeout = hashedWheelTimer.newTimeout((timeout) -> {
- synchronized (scheduleLock) {
- LOG.debug("Running configured action on {} scheduled items for scheduler {}. There are {} items left in queue.",
- items.size(), this, queue.size());
- items.forEach((key, item) -> action.accept(item));
- items.clear();
- items.putAll(queue);
- queue.clear();
- close();
- }
-
- startIfNotRunning();
- }, timeoutMillis, TimeUnit.MILLISECONDS);
- }
- }
-
- /**
- * Schedule item for processing
- *
- * @param key the item key
- * @param item the item
- */
- public void add(final K key, final V item) {
- synchronized (scheduleLock) {
- final long currentTime = System.currentTimeMillis();
-
- if (currentTime - toleranceMillis <= startTime) {
- LOG.debug("Adding {} to scheduled items for scheduler {}.", key, this);
- items.put(key, item);
- } else {
- LOG.debug("Adding {} to scheduling queue for scheduler {}.", key, this);
- queue.put(key, item);
- }
- }
- }
-
- /**
- * Remove item for processing
- * @param key the item key
- */
- public void remove(final K key) {
- synchronized (scheduleLock) {
- LOG.debug("Removing {} from scheduled items and queue for scheduler {}", key, this);
- items.remove(key);
- queue.remove(key);
-
- if (items.isEmpty() && queue.isEmpty()) {
- close();
- }
- }
- }
-
- @Override
- public void close() {
- LOG.debug("Closing scheduler {} and cancelling all running tasks.", this);
- startTime = -1;
-
- if (Objects.nonNull(runningTimeout)) {
- runningTimeout.cancel();
- runningTimeout = null;
- }
- }
-}
deviceInitializerProvider,
true, false);
- deviceContext.setSalRoleService(salRoleService);
((DeviceContextImpl) deviceContext).lazyTransactionManagerInitialization();
deviceContextSpy = Mockito.spy(deviceContext);
package org.opendaylight.openflowplugin.impl.lifecycle;
import com.google.common.util.concurrent.Futures;
-import io.netty.util.HashedWheelTimer;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent;
import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkRegistration;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
private static final String ENTITY_TEST = "EntityTest";
private static final String OPENFLOW_TEST = "openflow:test";
@Mock
- private HashedWheelTimer timer;
- @Mock
private StatisticsManager statisticsManager;
@Mock
private RpcManager rpcManager;
@Mock
private DeviceManager deviceManager;
@Mock
+ private RoleManager roleManager;
+ @Mock
private StatisticsContext statisticsContext;
@Mock
private RpcContext rpcContext;
@Mock
private DeviceContext deviceContext;
@Mock
+ private RoleContext roleContext;
+ @Mock
private ConnectionContext connectionContext;
@Mock
private DeviceInfo deviceInfo;
@Mock
private EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
@Mock
- private OwnershipChangeListener ownershipChangeListener;
- @Mock
private ReconciliationFrameworkEvent reconciliationFrameworkEvent;
@Mock
private FeaturesReply featuresReply;
Mockito.when(deviceManager.createContext(connectionContext)).thenReturn(deviceContext);
Mockito.when(rpcManager.createContext(deviceContext)).thenReturn(rpcContext);
Mockito.when(statisticsManager.createContext(deviceContext)).thenReturn(statisticsContext);
- Mockito.when(deviceContext.makeDeviceSlave()).thenReturn(Futures.immediateFuture(null));
+ Mockito.when(roleManager.createContext(deviceContext)).thenReturn(roleContext);
Mockito.when(deviceContext.getDeviceInfo()).thenReturn(deviceInfo);
Mockito.when(singletonServicesProvider.registerClusterSingletonService(Mockito.any()))
registration = manager.reconciliationFrameworkRegistration(reconciliationFrameworkEvent);
contextChainHolder = new ContextChainHolderImpl(
- timer,
executorService,
singletonServicesProvider,
entityOwnershipService,
- manager
- );
+ manager);
contextChainHolder.addManager(statisticsManager);
contextChainHolder.addManager(rpcManager);
contextChainHolder.addManager(deviceManager);
+ contextChainHolder.addManager(roleManager);
}
@Test
Mockito.verify(deviceManager).createContext(Mockito.any(ConnectionContext.class));
Mockito.verify(rpcManager).createContext(Mockito.any(DeviceContext.class));
Mockito.verify(statisticsManager).createContext(Mockito.any(DeviceContext.class));
+ Mockito.verify(roleManager).createContext(Mockito.any(DeviceContext.class));
}
public void notAbleToSetSlave() throws Exception {
registration.close();
contextChainHolder.deviceConnected(connectionContext);
- contextChainHolder.onSlaveRoleNotAcquired(deviceInfo);
+ contextChainHolder.onSlaveRoleNotAcquired(deviceInfo, "Test reason");
Mockito.verify(deviceContext).close();
Mockito.verify(statisticsContext).close();
Mockito.verify(rpcContext).close();
contextChainHolder.ownershipChanged(ownershipChange);
Mockito.verify(deviceManager,Mockito.never()).removeDeviceFromOperationalDS(Mockito.any());
}
-}
\ No newline at end of file
+}
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.openflowplugin.impl.role.RoleChangeException;
@RunWith(MockitoJUnitRunner.class)
public class ContextChainImplTest {
Assert.assertEquals(contextChain.getIdentifier(), SERVICE_GROUP_IDENTIFIER);
}
- @Test
- public void makeDeviceSlave() throws Exception {
- Mockito.when(deviceContext.makeDeviceSlave()).thenReturn(Futures.immediateFuture(null));
- contextChain.makeDeviceSlave();
- Mockito.verify(contextChainMastershipWatcher).onSlaveRoleAcquired(Mockito.any(DeviceInfo.class));
- }
-
- @Test
- public void makeDeviceSlaveFailure() throws Exception {
- Mockito.when(deviceContext.makeDeviceSlave())
- .thenReturn(Futures.immediateFailedFuture(new RoleChangeException(TEST_NODE)));
- contextChain.makeDeviceSlave();
- Mockito.verify(contextChainMastershipWatcher).onSlaveRoleNotAcquired(Mockito.any(DeviceInfo.class));
- }
-
@Test
public void instantiateServiceInstanceFail() throws Exception {
Mockito.doThrow(new IllegalStateException()).when(deviceContext).instantiateServiceInstance();
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.role;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.Future;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RoleContextImplTest {
+ @Mock
+ private SalRoleService roleService;
+ @Mock
+ private ContextChainMastershipWatcher contextChainMastershipWatcher;
+ @Mock
+ private DeviceInfo deviceInfo;
+ @Mock
+ private DeviceContext deviceContext;
+ @Mock
+ private Future<RpcResult<SetRoleOutput>> setRoleFuture;
+ private RoleContext roleContext;
+
+ @Before
+ public void setUp() throws Exception {
+ when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil
+ .createNodeInstanceIdentifier(new NodeId("openflow:1")));
+ when(deviceInfo.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+ when(roleService.setRole(any())).thenReturn(Futures.immediateFuture(null));
+
+ roleContext = new RoleContextImpl(deviceInfo, new HashedWheelTimer(), 20000);
+ roleContext.registerMastershipWatcher(contextChainMastershipWatcher);
+ roleContext.setRoleService(roleService);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ roleContext.close();
+ }
+
+ @Test
+ public void instantiateServiceInstance() throws Exception {
+ roleContext.instantiateServiceInstance();
+ verify(roleService).setRole(new SetRoleInputBuilder()
+ .setControllerRole(OfpRole.BECOMEMASTER)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
+ .build());
+ verify(contextChainMastershipWatcher).onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE);
+ }
+
+ @Test
+ public void closeServiceInstance() throws Exception {
+ when(setRoleFuture.isCancelled()).thenReturn(false);
+ when(setRoleFuture.isDone()).thenReturn(false);
+ when(roleService.setRole(any())).thenReturn(setRoleFuture);
+ roleContext.instantiateServiceInstance();
+ roleContext.closeServiceInstance().get();
+ verify(setRoleFuture).cancel(true);
+ }
+
+}
\ No newline at end of file
package org.opendaylight.openflowplugin.impl.services.sal;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
}
- @Test
- public void testSetRoleUnsupported() throws Exception {
- ListenableFuture<RpcResult<RoleRequestOutput>> futureOutput =
- RpcResultBuilder.<RoleRequestOutput>failed()
- .withError(ErrorType.APPLICATION, ROLES_UNSUPPORTED)
- .buildFuture();
-
- Mockito.when(mockRequestContext.getFuture()).thenReturn(futureOutput);
-
- SalRoleService salRoleService = new SalRoleServiceImpl(mockRequestContextStack, mockDeviceContext);
-
- SetRoleInput setRoleInput = new SetRoleInputBuilder()
- .setControllerRole(OfpRole.BECOMESLAVE)
- .setNode(nodeRef)
- .build();
-
- Future<RpcResult<SetRoleOutput>> future = salRoleService.setRole(setRoleInput);
-
- RpcResult<SetRoleOutput> roleOutputRpcResult = future.get(5, TimeUnit.SECONDS);
- assertNotNull("RpcResult from future cannot be null.", roleOutputRpcResult);
- assertFalse("RpcResult from future is successful.", roleOutputRpcResult.isSuccessful());
- assertEquals(ROLES_UNSUPPORTED, roleOutputRpcResult
- .getErrors().iterator().next().getMessage());
- }
-
@Test
public void testDuplicateRoles() throws Exception {
// set role to slave