statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor, convertorManager);
conductor.setSafelyManager(statisticsManager);
- rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor);
+ rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, extensionConverterManager, conductor, convertorManager);
conductor.setSafelyManager(rpcManager);
roleManager.addRoleChangeListener((RoleChangeListener) conductor);
private final LifecycleConductor conductor;
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
- final long globalNotificationQuota, final boolean switchFeaturesMandatory,
- final long barrierInterval, final int barrierCountLimit,
- final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff,
- final ConvertorExecutor convertorExecutor) {
+ final long globalNotificationQuota,
+ final boolean switchFeaturesMandatory,
+ final long barrierInterval,
+ final int barrierCountLimit,
+ final LifecycleConductor lifecycleConductor,
+ boolean isNotificationFlowRemovedOff,
+ final ConvertorExecutor convertorExecutor) {
this.switchFeaturesMandatory = switchFeaturesMandatory;
this.globalNotificationQuota = globalNotificationQuota;
this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
private final RoleContext roleContext;
private final StatisticsContext statContext;
- public LifecycleServiceImpl(
+ LifecycleServiceImpl(
final DeviceContext deviceContext,
final RpcContext rpcContext,
final RoleContext roleContext,
@Override
public void instantiateServiceInstance() {
- LOG.info("Starting device context cluster services for node {}", this.deviceContext.getServiceIdentifier());
try {
+
+ LOG.info("Starting device context cluster services for node {}", getIdentifier());
this.deviceContext.startupClusterServices();
+
+ LOG.info("Starting statistics context cluster services for node {}", getIdentifier());
+ this.statContext.startupClusterServices();
+
+ LOG.info("Starting rpc context cluster services for node {}", getIdentifier());
+ this.rpcContext.startupClusterServices();
+
+ LOG.info("Starting role context cluster services for node {}", getIdentifier());
+ this.roleContext.startupClusterServices();
+
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Cluster service {} was unable to start.", this.getIdentifier());
}
- LOG.info("Starting statistics context cluster services for node {}", this.deviceContext.getServiceIdentifier());
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
+ statContext.stopClusterServices();
+ rpcContext.stopClusterServices();
return deviceContext.stopClusterServices();
}
package org.opendaylight.openflowplugin.impl.role;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return this.deviceInfo;
}
+ public void startupClusterServices() throws ExecutionException, InterruptedException {
+ sendRoleChangeToDevice(OfpRole.BECOMEMASTER).get();
+ }
+
+ @Override
+ public ListenableFuture<Void> stopClusterServices() {
+ return Futures.immediateFuture(null);
+ }
+
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+ final Short version = deviceInfo.getVersion();
+ if (null == version) {
+ LOG.debug("Device version is null");
+ return Futures.immediateFuture(null);
+ }
+ if (version < OFConstants.OFP_VERSION_1_3) {
+ LOG.debug("Device version not support ROLE");
+ return Futures.immediateFuture(null);
+ } else {
+ final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
+ .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()))).build();
+ setRoleOutputFuture = getSalRoleService().setRole(setRoleInput);
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, deviceInfo.getNodeId());
+ setRoleOutputFuture.cancel(true);
+ }
+ };
+ conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
+ }
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
+ }
+
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.XidSequencer;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
+import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
private CONTEXT_STATE state;
private final DeviceInfo deviceInfo;
+ private final DeviceContext deviceContext;
+ private final ExtensionConverterProvider extensionConverterProvider;
+ private final ConvertorExecutor convertorExecutor;
RpcContextImpl(final DeviceInfo deviceInfo,
final RpcProviderRegistry rpcProviderRegistry,
final XidSequencer xidSequencer,
final MessageSpy messageSpy,
final int maxRequests,
- final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier) {
+ final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier,
+ final DeviceContext deviceContext,
+ final ExtensionConverterProvider extensionConverterProvider,
+ final ConvertorExecutor convertorExecutor) {
this.xidSequencer = Preconditions.checkNotNull(xidSequencer);
this.messageSpy = Preconditions.checkNotNull(messageSpy);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.nodeInstanceIdentifier = nodeInstanceIdentifier;
tracker = new Semaphore(maxRequests, true);
+ this.extensionConverterProvider = extensionConverterProvider;
setState(CONTEXT_STATE.WORKING);
this.deviceInfo = deviceInfo;
+ this.deviceContext = deviceContext;
+ this.convertorExecutor = convertorExecutor;
}
/**
public DeviceInfo getDeviceInfo() {
return this.deviceInfo;
}
+
+ @Override
+ public void startupClusterServices() throws ExecutionException, InterruptedException {
+ MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor);
+ }
+
+ @Override
+ public ListenableFuture<Void> stopClusterServices() {
+ MdSalRegistrationUtils.unregisterServices(this);
+ return Futures.immediateFuture(null);
+ }
}
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final int maxRequestsQuota;
private final ConcurrentMap<DeviceInfo, RpcContext> contexts = new ConcurrentHashMap<>();
private boolean isStatisticsRpcEnabled;
-
+ private final ExtensionConverterProvider extensionConverterProvider;
private final LifecycleConductor conductor;
+ private final ConvertorExecutor convertorExecutor;
public RpcManagerImpl(
final RpcProviderRegistry rpcProviderRegistry,
final int quotaValue,
- final LifecycleConductor lifecycleConductor) {
+ ExtensionConverterProvider extensionConverterProvider,
+ final LifecycleConductor lifecycleConductor,
+ final ConvertorExecutor convertorExecutor) {
this.rpcProviderRegistry = rpcProviderRegistry;
maxRequestsQuota = quotaValue;
+ this.extensionConverterProvider = extensionConverterProvider;
this.conductor = lifecycleConductor;
+ this.convertorExecutor = convertorExecutor;
}
@Override
deviceContext,
deviceContext.getMessageSpy(),
maxRequestsQuota,
- deviceInfo.getNodeInstanceIdentifier());
+ deviceInfo.getNodeInstanceIdentifier(),
+ deviceContext,
+ extensionConverterProvider,
+ convertorExecutor);
Verify.verify(contexts.putIfAbsent(deviceInfo, rpcContext) == null, "RpcCtx still not closed for node {}", deviceInfo.getNodeId());
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
public DeviceInfo getDeviceInfo() {
return this.deviceInfo;
}
+
+ @Override
+ public void startupClusterServices() throws ExecutionException, InterruptedException {
+ this.statListForCollectingInitialization();
+ this.initialGatherDynamicData();
+ }
+
+ @Override
+ public ListenableFuture<Void> stopClusterServices() {
+ return Futures.immediateFuture(null);
+ }
}
import org.opendaylight.openflowplugin.api.openflow.device.XidSequencer;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
private TestRpcService serviceInstance;
@Mock
private DeviceInfo deviceInfo;
+ @Mock
+ private ExtensionConverterProvider extensionConverterProvider;
+ @Mock
+ private ConvertorExecutor convertorExecutor;
private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
deviceContext,
messageSpy,
MAX_REQUESTS,
- nodeInstanceIdentifier);
+ nodeInstanceIdentifier,
+ deviceContext,
+ extensionConverterProvider,
+ convertorExecutor);
when(rpcProviderRegistry.addRoutedRpcImplementation(TestRpcService.class, serviceInstance)).thenReturn(routedRpcReg);
xidSequencer,
messageSpy,
100,
- nodeInstanceIdentifier)) {
+ nodeInstanceIdentifier,
+ deviceContext,
+ extensionConverterProvider,
+ convertorExecutor)) {
final RequestContext<?> requestContext = rpcContext.createRequestContext();
assertNotNull(requestContext);
}
xidSequencer,
messageSpy,
0,
- nodeInstanceIdentifier)) {
+ nodeInstanceIdentifier,
+ deviceContext,
+ extensionConverterProvider,
+ convertorExecutor)) {
final RequestContext<?> requestContext = rpcContext.createRequestContext();
assertNull(requestContext);
}
deviceContext,
messageSpy,
100,
- nodeInstanceIdentifier)) {
+ nodeInstanceIdentifier,
+ deviceContext,
+ extensionConverterProvider,
+ convertorExecutor)) {
final RequestContext<?> requestContext = rpcContext.createRequestContext();
assertNotNull(requestContext);
requestContext.close();
import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
private ConcurrentMap<DeviceInfo, RpcContext> contexts;
@Mock
private DeviceInfo deviceInfo;
+ @Mock
+ private ExtensionConverterProvider extensionConverterProvider;
+ @Mock
+ private ConvertorExecutor convertorExecutor;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() {
final NodeKey nodeKey = new NodeKey(nodeId);
- rpcManager = new RpcManagerImpl(rpcProviderRegistry, QUOTA_VALUE, conductor);
+ rpcManager = new RpcManagerImpl(rpcProviderRegistry, QUOTA_VALUE, extensionConverterProvider, conductor, convertorExecutor);
rpcManager.setDeviceInitializationPhaseHandler(deviceINitializationPhaseHandler);
GetFeaturesOutput featuresOutput = new GetFeaturesOutputBuilder()