import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
/**
* Created by kramesha on 9/12/15.
void setTxLockOwned(boolean txLockOwned);
- void promoteStateToWorking();
-
- OfpRole getPropagatingRole();
-
- void setPropagatingRole(OfpRole propagatingRole);
-
- /** available states the {@link RoleContext} can exist in */
- enum ROLE_CONTEXT_STATE {
- /**
- * before consequences of first entity ownership election are completely settled
- * (lock acquired, data written, role propagated onto device)
- */
- STARTING,
- /**
- * state between
- * <ul>
- * <li>first entity ownership election settled</li>
- * <li>and device disconnected or {@link DeviceContext#close()} invoked</li>
- * </ul>
- */
- WORKING,
- /** after {@link DeviceContext#close()} invoked */
- TEARING_DOWN
- }
-
/**
* Initialization method is responsible for a registration of
- * {@link org.opendaylight.controller.md.sal.common.api.clustering.Entity}
- * and listen for notification from service. {@link Future} returned object is used primary
+ * {@link org.opendaylight.controller.md.sal.common.api.clustering.Entity} and listen for notification from service.
+ * {@link Future} returned object is used primary
* for new connection initialization phase where we have to wait for actual Role.
* The {@link Future} has to be canceled if device is in disconnected state or when
* {@link org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService} returns
DeviceContext getDeviceContext();
- ROLE_CONTEXT_STATE getState();
-
boolean isTxLockOwned();
}
<S extends RpcService> S lookupRpcService(Class<S> serviceClass);
<S extends RpcService> void unregisterRpcServiceImplementation(Class<S> serviceClass);
+
+ void registerStatCompatibilityServices();
+
+ @Override
+ void close();
}
statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota);
- // CM -> DM -> Role -> SM -> RPC -> DM
+ // CM -> DM -> SM -> RPC -> Role -> DM
connectionManager.setDeviceConnectedHandler(deviceManager);
- deviceManager.setDeviceInitializationPhaseHandler(roleManager);
- roleManager.setDeviceInitializationPhaseHandler(statisticsManager);
+ deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
statisticsManager.setDeviceInitializationPhaseHandler(rpcManager);
- rpcManager.setDeviceInitializationPhaseHandler(deviceManager);
+ rpcManager.setDeviceInitializationPhaseHandler(roleManager);
+ roleManager.setDeviceInitializationPhaseHandler(deviceManager);
+
rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
rpcManager.setNotificationPublishService(notificationPublishService);
}
if (OfpRole.BECOMEMASTER.equals(role)) {
if (!deviceState.deviceSynchronized()) {
- LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+ //TODO: no necessary code for yet - it needs for initialization phase only
+ LOG.debug("Setup Empty TxManager {} for initialization phase", getDeviceState().getNodeId());
transactionChainManager.activateTransactionManager();
return Futures.immediateCheckedFuture(null);
}
/* Relevant for no initial Slave-to-Master scenario in cluster */
return asyncClusterRoleChange(role);
+
} else if (OfpRole.BECOMESLAVE.equals(role)) {
- if (rpcContext != null) {
+ if (null != rpcContext) {
MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
}
return transactionChainManager.deactivateTransactionManager();
} else {
LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
- if (rpcContext != null) {
+ if (null != rpcContext) {
MdSalRegistratorUtils.unregisterServices(rpcContext);
}
return transactionChainManager.deactivateTransactionManager();
new AsyncFunction<Optional<FlowCapableNode>, Void>() {
@Override
public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
- if (!input.isPresent() || input.get().getTable().isEmpty()) {
+ if (!input.isPresent() || input.get().getTable() != null || input.get().getTable().isEmpty()) {
/* Last master close fail scenario so we would like to activate TxManager */
LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
getDeviceState().setDeviceSynchronized(false);
return null;
}
LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
- if (null != getRpcContext()) {
+ getDeviceState().setDeviceSynchronized(true);
+ transactionChainManager.activateTransactionManager();
+ //TODO: This is relevant for slave to master scenario make verify
+ if (null != rpcContext) {
MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+ getRpcContext().registerStatCompatibilityServices();
+ } else {
+ LOG.warn("No RpcCtx on deviceCtx: {}, cannot register services", this);
+ // TODO : can we stay without RPCs or we need to call DeviceCtx.close ?
}
- getDeviceState().setDeviceSynchronized(true);
+ initialSubmitTransaction();
getDeviceState().setStatisticsPollingEnabledProp(true);
- transactionChainManager.activateTransactionManager();
return null;
}
});
@Override
public void onSuccess(final Void result) {
- LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
+ LOG.info("TxChain {} was shutdown successfull.", getDeviceState().getNodeId());
tearDownClean();
}
@Override
public void onFailure(final Throwable t) {
- LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
+ LOG.warn("Shutdown TxChain {} fail.", getDeviceState().getNodeId(), t);
tearDownClean();
}
});
}
}
- synchronized void tearDownClean() {
+ protected void tearDownClean() {
LOG.info("Closing transaction chain manager without cleaning inventory operational");
Preconditions.checkState(!deviceState.isValid());
transactionChainManager.close();
*/
package org.opendaylight.openflowplugin.impl.device;
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import io.netty.util.HashedWheelTimer;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import io.netty.util.HashedWheelTimer;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
+ Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null, "DeviceCtx still not closed.");
deviceContext.addDeviceContextClosedHandler(this);
- Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null);
((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
deviceContext.setNotificationService(notificationService);
* transactions. Call this method for MASTER role only.
*/
public void activateTransactionManager() {
- LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId());
+ LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled);
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
private final Semaphore mainCandidateGuard = new Semaphore(1, true);
private final Semaphore txCandidateGuard = new Semaphore(1, true);
- private volatile ROLE_CONTEXT_STATE state;
private volatile boolean txLockOwned;
- private volatile OfpRole propagatingRole;
public RoleContextImpl(final DeviceContext deviceContext, final EntityOwnershipService entityOwnershipService,
final Entity entity, final Entity txEnitity) {
@Override
public void initialization() throws CandidateAlreadyRegisteredException {
- state = ROLE_CONTEXT_STATE.STARTING;
- LOG.debug("Initialization requestOpenflowEntityOwnership for entity {}", entity);
- entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
- LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}",
- deviceContext.getPrimaryConnectionContext().getNodeId().getValue());
+ LOG.debug("Initialization RoleContext for Node {}", deviceContext.getDeviceState().getNodeId());
+ final AsyncFunction<RpcResult<SetRoleOutput>, Void> initFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> input) throws Exception {
+ LOG.debug("Initialization request OpenflowEntityOwnership for entity {}", entity);
+ getDeviceState().setRole(OfpRole.BECOMESLAVE);
+ entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
+ LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
+ .getPrimaryConnectionContext().getNodeId().getValue());
+ return Futures.immediateFuture(null);
+ }
+ };
+ final ListenableFuture<Void> roleChange = sendRoleChangeToDevice(OfpRole.BECOMESLAVE, initFunction);
+ Futures.addCallback(roleChange, new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Initial RoleContext for Node {} is successful", deviceContext.getDeviceState().getNodeId());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.warn("Initial RoleContext for Node {} fail", deviceContext.getDeviceState().getNodeId(), t);
+ deviceContext.close();
+ }
+ });
}
@Override
LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
deviceContext.getPrimaryConnectionContext().getNodeId());
-
- final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeRpcFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
+ final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
@Override
public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
- LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
- final ListenableFuture<Void> nextStepFuture;
- switch (state) {
- case STARTING:
- if (OfpRole.BECOMESLAVE.equals(newRole)) {
- getDeviceState().setRole(newRole);
- nextStepFuture = Futures.immediateFuture(null);
- } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
- nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
- } else {
- nextStepFuture = Futures.immediateFuture(null);
- }
-
- break;
- case WORKING:
- nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
- break;
- //case TEARING_DOWN:
- default:
- nextStepFuture = Futures.immediateFuture(null);
- break;
- }
-
- return nextStepFuture;
+ LOG.debug("Role change {} successful made on switch :{}", newRole, deviceContext.getDeviceState()
+ .getNodeId());
+ getDeviceState().setRole(newRole);
+ return deviceContext.onClusterRoleChange(oldRole, newRole);
}
};
-
- return sendRoleChangeToDevice(newRole, roleChangeRpcFunction);
+ return sendRoleChangeToDevice(newRole, roleChangeFunction);
}
@Override
LOG.debug("Closing EntityOwnershipCandidateRegistration for {}", entity);
entityOwnershipCandidateRegistration.close();
}
- promoteStateToTearingDown();
}
@Override
return txCandidateGuard;
}
- @Override
- public ROLE_CONTEXT_STATE getState() {
- return state;
- }
-
@Override
public boolean isTxLockOwned() {
return txLockOwned;
this.txLockOwned = txLockOwned;
}
- private void promoteStateToTearingDown() {
- state = ROLE_CONTEXT_STATE.TEARING_DOWN;
- }
-
- @Override
- public void promoteStateToWorking() {
- state = ROLE_CONTEXT_STATE.WORKING;
- }
-
- @Override
- public OfpRole getPropagatingRole() {
- return propagatingRole;
- }
-
- @Override
- public void setPropagatingRole(final OfpRole propagatingRole) {
- this.propagatingRole = propagatingRole;
- }
-
private ListenableFuture<Void> sendRoleChangeToDevice(final OfpRole newRole, final AsyncFunction<RpcResult<SetRoleOutput>, Void> function) {
LOG.debug("Send new Role {} to Device {}", newRole, deviceContext.getDeviceState().getNodeId());
final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
-import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.slf4j.Logger;
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
-
final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
makeEntity(deviceContext.getDeviceState().getNodeId()),
makeTxEntity(deviceContext.getDeviceState().getNodeId()));
+
+ Verify.verify(contexts.putIfAbsent(roleContext.getEntity(), roleContext) == null, "RoleCtx for master Node {} is still not closed.", deviceContext.getDeviceState().getNodeId());
+ Verify.verify(!txContexts.containsKey(roleContext.getTxEntity()),
+ "RoleCtx for master Node {} is still not closed. TxEntity was not unregistered yet.", deviceContext.getDeviceState().getNodeId());
+
// if the device context gets closed (mostly on connection close), we would need to cleanup
deviceContext.addDeviceContextClosedHandler(this);
- final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
- Verify.verify(previousContext == null,
- "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
-
roleContext.initialization();
+ deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
}
void getRoleContextLevelUp(final DeviceContext deviceContext) {
if (!txContexts.containsKey(roleContext.getTxEntity())) {
try {
txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
- roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
roleContext.setupTxCandidate();
// we'd like to wait for registration response
return;
final DeviceContext deviceContext = roleContext.getDeviceContext();
final NodeId nodeId = roleContext.getDeviceState().getNodeId();
- if (!roleContext.getDeviceState().isValid()
- && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
- LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId());
- roleContext.close();
- txCandidateGuard.release();
- return;
- }
-
if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
// SLAVE -> MASTER - acquired transition lock
LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
roleContext.setTxLockOwned(true);
final OfpRole role = roleContext.getDeviceState().getRole();
- Verify.verify(OfpRole.BECOMEMASTER.equals(roleContext.getPropagatingRole()),
+ /* SLAVE to MASTER scenario has wait for TxEntity LEADER before sending ROLE to Device */
+ Verify.verify(OfpRole.BECOMESLAVE.equals(role),
"Acquired tx-lock but current role = {}", role);
- switch (roleContext.getState()) {
- case STARTING:
- processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
- // activate stats - accomplished automatically by chaging role in deviceState
- // collect initial dynamic data from device
- processingClosure = Futures.transform(processingClosure, new AsyncFunction<Void, Void>() {
- @Nullable
- @Override
- public ListenableFuture<Void> apply(@Nullable final Void aVoid) {
- deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
- return DeviceInitializationUtils.initializeNodeInformation(
- deviceContext, switchFeaturesMandatory);
- }
- });
- break;
- case WORKING:
- // activate txChainManager, activate rpcs
- processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
- // activate stats - accomplished automatically by chaging role in deviceState
- processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable final Void aVoid) {
- deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
- return null;
- }
- });
- break;
- //case TEARING_DOWN:
- default:
- //TODO: reconsider if there is really nothing to do when tearing down
- processingClosure = Futures.immediateFuture(null);
- break;
+ // activate txChainManager, activate rpcs
+ if (roleContext.getDeviceState().isValid()) {
+ processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
+ } else {
+ // We are not able to send anything to device, but we need to handle closing state clearly
+ roleContext.close();
+ processingClosure = Futures.immediateFuture(null);
}
+ // activate stats - accomplished automatically by changing role in deviceState
+ processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable final Void aVoid) {
+ deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
+ return null;
+ }
+ });
} else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
// MASTER -> SLAVE - released tx-lock
LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
@Override
public void onSuccess(@Nullable final Void aVoid) {
// propagating role must be BECOMEMASTER in order to run this processing
- // removing it will disable redundand processing of BECOMEMASTER
- roleContext.setPropagatingRole(null);
-
+ // removing it will disable redundant processing of BECOMEMASTER
txCandidateGuard.release();
- switch (roleContext.getState()) {
- case STARTING:
- LOG.debug("init steps protected by tx-lock for node {} are done.", nodeId);
- roleContext.promoteStateToWorking();
- getRoleContextLevelUp(deviceContext);
- break;
- case WORKING:
- LOG.debug("normal steps protected by tx-lock for node {} are done.", nodeId);
- break;
- case TEARING_DOWN:
- LOG.debug("teardown steps protected by tx-lock for node {} are done.", nodeId);
- break;
- }
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("Unexpected error for Node {}, state={}, txLock={} -> terminating device context",
- nodeId, roleContext.getState(), roleContext.isTxLockOwned(), throwable);
+ LOG.warn("Unexpected error for Node {}, txLock={} -> terminating device context", nodeId,
+ roleContext.isTxLockOwned(), throwable);
txCandidateGuard.release();
deviceContext.close();
}
LOG.debug("Node {} is marked as LEADER", nodeId);
Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
"RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
- roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
-
// try to register tx-candidate via ownership service
roleContext.setupTxCandidate();
} catch (final CandidateAlreadyRegisteredException e) {
// withdraw context from map in order to have it as before
txContexts.remove(roleContext.getTxEntity(), roleContext);
// no more propagating any role - there is no txCandidate lock approaching
- roleContext.setPropagatingRole(null);
roleContext.getDeviceContext().close();
}
return null;
if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
// MASTER -> SLAVE
rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
- if (RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
- txProcessCallback = makeTxEntitySuspendCallback(roleContext);
- } else {
- txProcessCallback = null;
- }
+ txProcessCallback = makeTxEntitySuspendCallback(roleContext);
} else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
// SLAVE -> MASTER
txProcessCallback = makeTxEntitySetupCallback(roleContext);
- } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
- if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
- rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
- }
- txProcessCallback = null;
} else {
- LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
+ LOG.debug("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
txProcessCallback = null;
}
public void onSuccess(@Nullable final Void aVoid) {
LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
ownershipChange.getEntity(), oldRole, newRole);
- roleContext.setPropagatingRole(newRole);
mainCandidateGuard.release();
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
+ ownershipChange.getEntity(), oldRole, newRole, throwable);
mainCandidateGuard.release();
roleContext.getDeviceContext().close();
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
-
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.slf4j.Logger;
// TODO: add private Sal salBroker
private final ConcurrentMap<Class<?>, RoutedRpcRegistration<?>> rpcRegistrations = new ConcurrentHashMap<>();
+ private final boolean isStatisticsRpcEnabled;
+ private final NotificationPublishService notificationPublishService;
- public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext, final int maxRequests) {
- this.messageSpy = messageSpy;
- this.rpcProviderRegistry = rpcProviderRegistry;
+ public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext,
+ final int maxRequests, final boolean isStatisticsRpcEnabled,
+ final NotificationPublishService notificationPublishService) {
this.deviceContext = Preconditions.checkNotNull(deviceContext);
+ this.messageSpy = Preconditions.checkNotNull(messageSpy);
+ this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
+ this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
+ this.notificationPublishService = notificationPublishService;
tracker = new Semaphore(maxRequests, true);
}
@Override
public <S extends RpcService> void registerRpcServiceImplementation(final Class<S> serviceClass,
final S serviceInstance) {
+ LOG.trace("Try to register service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
if (! rpcRegistrations.containsKey(serviceClass)) {
final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
rpcRegistrations.put(serviceClass, routedRpcReg);
+ LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
}
- LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
+ }
- if (serviceInstance instanceof ItemLifeCycleSource) {
- // TODO: collect registration for selective unregistering in case of tearing down only one rpc
- deviceContext.getItemLifeCycleSourceRegistry().registerLifeCycleSource((ItemLifeCycleSource) serviceInstance);
+ public void registerStatCompatibilityServices() {
+ if (isStatisticsRpcEnabled) {
+ MdSalRegistratorUtils.registerStatCompatibilityServices(RpcContextImpl.this, deviceContext,
+ notificationPublishService, new AtomicLong());
}
}
@Override
- public <S extends RpcService> S lookupRpcService(Class<S> serviceClass) {
- S service = null;
- for (RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations.values()) {
- final RpcService rpcService = rpcRegistration.getInstance();
- if (serviceClass.isInstance(rpcService)) {
- service = (S) rpcService;
- break;
- }
- }
- return service;
+ public <S extends RpcService> S lookupRpcService(final Class<S> serviceClass) {
+ final RpcService rpcService = rpcRegistrations.get(serviceClass).getInstance();
+ return (S) rpcService;
}
/**
* Unregisters all services.
LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
deviceContext.getDeviceState().getNodeInstanceIdentifier());
}
+ rpcRegistrations.clear();
}
@Override
LOG.info("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits());
}
- final Long xid = deviceContext.reservedXidForDeviceMessage();
- if (xid == null) {
- LOG.error("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId());
- }
-
return new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
@Override
public void close() {
@Override
public <S extends RpcService> void unregisterRpcServiceImplementation(final Class<S> serviceClass) {
- LOG.debug("Unregistration serviceClass {} for Node {}", serviceClass, deviceContext.getDeviceState().getNodeId());
+ LOG.trace("Try to unregister serviceClass {} for Node {}", serviceClass, deviceContext.getDeviceState().getNodeId());
final RoutedRpcRegistration<?> rpcRegistration = rpcRegistrations.remove(serviceClass);
if (rpcRegistration != null) {
rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
rpcRegistration.close();
+ LOG.debug("Unregistration serviceClass {} for Node {}", serviceClass, deviceContext.getDeviceState().getNodeId());
}
}
}
*/
package org.opendaylight.openflowplugin.impl.rpc;
+import com.google.common.base.Verify;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
private final RpcProviderRegistry rpcProviderRegistry;
private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
private final int maxRequestsQuota;
- private final ConcurrentHashMap<DeviceContext, RpcContext> contexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceContext, RpcContext> contexts = new ConcurrentHashMap<>();
private boolean isStatisticsRpcEnabled;
private NotificationPublishService notificationPublishService;
final OfpRole ofpRole = deviceContext.getDeviceState().getRole();
LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", nodeId, ofpRole);
+ final RpcContext rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry,
+ deviceContext, maxRequestsQuota, isStatisticsRpcEnabled, notificationPublishService);
- RpcContext rpcContext = contexts.get(deviceContext);
- if (rpcContext == null) {
- rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext, maxRequestsQuota);
- contexts.put(deviceContext, rpcContext);
- }
-
+ Verify.verify(contexts.putIfAbsent(deviceContext, rpcContext) == null, "RpcCtx still not closed for node {}", nodeId);
deviceContext.addDeviceContextClosedHandler(this);
+ //FIXME : propagate isStatisticsRpcEnabled to DeviceContext
+
if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
- if (isStatisticsRpcEnabled) {
- MdSalRegistratorUtils.registerStatCompatibilityServices(rpcContext, deviceContext,
- notificationPublishService, new AtomicLong());
- }
} else if(OfpRole.BECOMESLAVE.equals(ofpRole)) {
// if slave, we need to de-register rpcs if any have been registered, in case of master to slave
LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
@Override
public void close() throws Exception {
-
+ for(final RpcContext ctx : contexts.values()) {
+ ctx.close();
+ }
+ contexts.clear();
}
public void onDeviceContextClosed(final DeviceContext deviceContext) {
final RpcContext removedContext = contexts.remove(deviceContext);
if (removedContext != null) {
- try {
- LOG.info("Unregistering rpcs for device context closure");
- removedContext.close();
- } catch (final Exception e) {
- LOG.error("Exception while unregistering rpcs onDeviceContextClosed handler for node:{}. But continuing.",
- deviceContext.getDeviceState().getNodeId(), e);
- }
+ LOG.info("Unregistering rpcs for device context closure");
+ removedContext.close();
}
}
@Override
statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
statListForCollectingInitialization();
- deviceContext.setStatisticsContext(StatisticsContextImpl.this);
+ this.deviceContext.setStatisticsContext(StatisticsContextImpl.this);
}
@Override
statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
}
- @VisibleForTesting
- protected void setStatisticsGatheringService ( final StatisticsGatheringService statisticsGatheringService){
- this.statisticsGatheringService = statisticsGatheringService;
- }
+ @VisibleForTesting
+ protected void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
+ this.statisticsGatheringService = statisticsGatheringService;
+ }
- @VisibleForTesting
- protected void setStatisticsGatheringOnTheFlyService ( final StatisticsGatheringOnTheFlyService
- statisticsGatheringOnTheFlyService){
- this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
- }
+ @VisibleForTesting
+ protected void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
+ statisticsGatheringOnTheFlyService) {
+ this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
+ }
@Override
public ItemLifecycleListener getItemLifeCycleListener () {
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.CheckForNull;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
private HashedWheelTimer hashedWheelTimer;
- private final ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap<>();
private static final long basicTimerDelay = 3000;
private static long currentTimerDelay = basicTimerDelay;
hashedWheelTimer = deviceContext.getTimer();
}
final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, shuttingDownStatisticsPolling);
+
+ Verify.verify(contexts.putIfAbsent(deviceContext, statisticsContext) == null, "StatisticsCtx still not closed for Node {}",deviceContext.getDeviceState().getNodeId());
deviceContext.addDeviceContextClosedHandler(this);
if (shuttingDownStatisticsPolling) {
}
scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
}
- contexts.put(deviceContext, statisticsContext);
deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
deviceContext.getDeviceState().setDeviceSynchronized(true);
}
*/
package org.opendaylight.openflowplugin.impl.util;
-import java.util.concurrent.atomic.AtomicLong;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.reflect.TypeToken;
Preconditions.checkArgument(rpcContext != null);
Preconditions.checkArgument(newRole != null);
Verify.verify(OfpRole.BECOMESLAVE.equals(newRole), "Service call with bad Role {} we expect role BECOMESLAVE", newRole);
-
+
unregisterServices(rpcContext);
}
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
when(deviceContext.getPrimaryConnectionContext().getFeatures()).thenReturn(featuresReply);
when(deviceContext.getPrimaryConnectionContext().getConnectionState()).thenReturn(ConnectionContext.CONNECTION_STATE.WORKING);
when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any(), Matchers.<OfpRole>any()))
- .thenReturn(Futures.immediateFuture((Void) null));
+ .thenReturn(Futures.immediateFuture((Void) null));
roleContext = new RoleContextImpl(deviceContext, entityOwnershipService, entity, txEntity);
roleContext.initialization();
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(5, TimeUnit.SECONDS);
- verify(deviceContext, Mockito.never()).onClusterRoleChange(oldRole, newRole);
+ verify(deviceContext).onClusterRoleChange(oldRole, newRole);
}
@Test
.thenReturn(future);
roleContext.setSalRoleService(salRoleService);
- roleContext.promoteStateToWorking();
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(5, TimeUnit.SECONDS);
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
private DeviceContext deviceContext;
@Mock
private MessageSpy messageSpy;
+ @Mock
+ private NotificationPublishService notificationPublishService;
private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
@Before
public void setup() {
- NodeId nodeId = new NodeId("openflow:1");
+ final NodeId nodeId = new NodeId("openflow:1");
nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
when(deviceState.getNodeInstanceIdentifier()).thenReturn(nodeInstanceIdentifier);
@Test
public void testStoreOrFail() throws Exception {
- try (final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 100)) {
- RequestContext<?> requestContext = rpcContext.createRequestContext();
+ try (final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext,
+ 100, false, notificationPublishService)) {
+ final RequestContext<?> requestContext = rpcContext.createRequestContext();
assertNotNull(requestContext);
}
}
@Test
public void testStoreOrFailThatFails() throws Exception {
- try (final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 0)) {
- RequestContext<?> requestContext = rpcContext.createRequestContext();
+ try (final RpcContext rpcContext = new RpcContextImpl(messageSpy, mockedRpcProviderRegistry, deviceContext, 0,
+ false, notificationPublishService)) {
+ final RequestContext<?> requestContext = rpcContext.createRequestContext();
assertNull(requestContext);
}
}
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
private DeviceState deviceState;
@Mock
private ItemLifeCycleRegistry itemLifeCycleRegistry;
+ @Mock
+ private MessageSpy messageSpy;
private KeyedInstanceIdentifier<Node, NodeKey> nodePath;
Mockito.when(deviceContext.getDeviceState().getRole()).thenReturn(OfpRole.BECOMEMASTER);
Mockito.when(deviceContext.getItemLifeCycleSourceRegistry()).thenReturn(itemLifeCycleRegistry);
Mockito.when(deviceState.getNodeInstanceIdentifier()).thenReturn(nodePath);
+ Mockito.when(deviceContext.getMessageSpy()).thenReturn(messageSpy);
}
@Test