package org.opendaylight.openflowplugin.api.openflow.device;
-import javax.annotation.CheckForNull;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.Timeout;
import java.math.BigInteger;
import java.util.List;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
-import io.netty.util.Timeout;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
* newRole is {@link OfpRole#BECOMESLAVE}.
* Parameters are used as marker to be sure it is change to SLAVE from MASTER or from
* MASTER to SLAVE and the last parameter "cleanDataStore" is used for validation only.
+ * @param oldRole - old role for quick validation for needed processing
* @param role - NewRole expect to be {@link OfpRole#BECOMESLAVE} or {@link OfpRole#BECOMEMASTER}
*/
- ListenableFuture<Void> onClusterRoleChange(@CheckForNull OfpRole role);
+ ListenableFuture<Void> onClusterRoleChange(@Nullable OfpRole oldRole, @CheckForNull OfpRole role);
/**
* Method creates put operation using provided data in underlying transaction chain.
MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext);
- Long getReservedXid();
+ /**
+ * Method is reserved unique XID for Device Message.
+ * Attention: OFJava expect the message, otherwise OutboundQueue could stop working.
+ * @return Reserved XID
+ */
+ Long reservedXidForDeviceMessage();
/**
* indicates that device context is fully published (e.g.: packetIn messages should be passed)
RpcContext getRpcContext();
+ void setStatisticsContext(StatisticsContext statisticsContext);
+
+ StatisticsContext getStatisticsContext();
+
@Override
void close();
}
OfpRole getRole();
+ boolean isStatisticsPollingEnabled();
+
+ void setStatisticsPollingEnabledProp(boolean statPollEnabled);
}
ListenableFuture<Boolean> gatherDynamicData();
+ /**
+ * Method has to be called from DeviceInitialization Method, otherwise
+ * we are not able to poll anything. Statistics Context normally initialize
+ * this part by initialization process but we don't have this information
+ * in initialization phase and we have to populate whole list after every
+ * device future collecting. Because device future collecting set DeviceState
+ * and we creating marks for the correct kind of stats from DeviceState.
+ */
+ void statListForCollectingInitialization();
+
/**
* @param pollTimeout handle to nearest scheduled statistics poll
*/
* @return dedicated item life cycle change listener (per device)
*/
ItemLifecycleListener getItemLifeCycleListener();
+ @Override
+ void close();
}
registerMXBean(messageIntelligenceAgency);
- deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota);
+ deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota, switchFeaturesMandatory);
((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, switchFeaturesMandatory);
statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
private RpcContext rpcContext;
private ExtensionConverterProvider extensionConverterProvider;
+ private final boolean switchFeaturesMandatory;
+ private StatisticsContext statCtx;
+
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final HashedWheelTimer hashedWheelTimer,
@Nonnull final MessageSpy _messageSpy,
@Nonnull final OutboundQueueProvider outboundQueueProvider,
- @Nonnull final TranslatorLibrary translatorLibrary) {
+ @Nonnull final TranslatorLibrary translatorLibrary,
+ final boolean switchFeaturesMandatory) {
+ this.switchFeaturesMandatory = switchFeaturesMandatory;
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
}
@Override
- public Long getReservedXid() {
+ public Long reservedXidForDeviceMessage() {
return outboundQueueProvider.reserveEntry();
}
}
@Override
- public ListenableFuture<Void> onClusterRoleChange(@CheckForNull final OfpRole role) {
- LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+ public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
+ LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
Preconditions.checkArgument(role != null);
- if (rpcContext != null) {
- // TODO : implement interface for onClusterRoleChange method
- MdSalRegistratorUtils.registerServices(rpcContext, this, role);
+ if (role.equals(oldRole)) {
+ LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
+ return Futures.immediateFuture(null);
}
-
- final ListenableFuture<Void> nextStepFuture;
if (OfpRole.BECOMEMASTER.equals(role)) {
- transactionChainManager.activateTransactionManager();
- nextStepFuture = Futures.immediateCheckedFuture(null);
- getDeviceState().setRole(role);
+ if (!deviceState.deviceSynchronized()) {
+ LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+ transactionChainManager.activateTransactionManager();
+ return Futures.immediateCheckedFuture(null);
+ }
+ /* Relevant for no initial Slave-to-Master scenario in cluster */
+ return asyncClusterRoleChange(role);
} else if (OfpRole.BECOMESLAVE.equals(role)) {
- nextStepFuture = transactionChainManager.deactivateTransactionManager();
- Futures.transform(nextStepFuture, new Function<Void, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable final Void aVoid) {
- getDeviceState().setRole(role);
- return null;
- }
- });
+ if (rpcContext != null) {
+ MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
+ }
+ return transactionChainManager.deactivateTransactionManager();
} else {
- LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
- nextStepFuture = Futures.immediateCheckedFuture(null);
+ LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+ if (rpcContext != null) {
+ MdSalRegistratorUtils.unregisterServices(rpcContext);
+ }
+ return transactionChainManager.deactivateTransactionManager();
+ }
+ }
+
+ /*
+ * we don't have active TxManager so anything will not be stored to DS yet, but we have
+ * check all NodeInformation for statistics otherwise statistics will not contains
+ * all possible MultipartTypes for polling in StatTypeList
+ */
+ private ListenableFuture<Void> asyncClusterRoleChange(final OfpRole role) {
+ if (statCtx == null) {
+ final String errMsg = String.format("DeviceCtx {} is up but we are missing StatisticsContext", deviceState.getNodeId());
+ LOG.warn(errMsg);
+ return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
}
- return nextStepFuture;
+ final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
+ .augmentation(FlowCapableNode.class);
+ final ReadOnlyTransaction readTx = getReadTransaction();
+ final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
+ LogicalDatastoreType.OPERATIONAL, ofNodeII);
+
+ final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
+ new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
+ if (!input.isPresent() || input.get().getTable().isEmpty()) {
+ /* Last master close fail scenario so we would like to activate TxManager */
+ LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
+ getDeviceState().setDeviceSynchronized(false);
+ transactionChainManager.activateTransactionManager();
+ }
+ return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
+ }
+ });
+
+ final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
+ new AsyncFunction<Void, Boolean>() {
+
+ @Override
+ public ListenableFuture<Boolean> apply(final Void input) throws Exception {
+ getStatisticsContext().statListForCollectingInitialization();
+ if (getDeviceState().deviceSynchronized()) {
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ return getStatisticsContext().gatherDynamicData();
+ }
+ });
+
+ return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
+
+ @Override
+ public Void apply(final Boolean input) {
+ if (!input.booleanValue()) {
+ LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
+ DeviceContextImpl.this.close();
+ return null;
+ }
+ LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
+ if (null != getRpcContext()) {
+ MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+ }
+ getDeviceState().setDeviceSynchronized(true);
+ getDeviceState().setStatisticsPollingEnabledProp(true);
+ transactionChainManager.activateTransactionManager();
+ return null;
+ }
+ });
}
@Override
@Override
public void onSuccess(final Void result) {
- LOG.info("Delete Node {} was successfull.", deviceState.getNodeId());
+ LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
tearDownClean();
}
@Override
public void onFailure(final Throwable t) {
- LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+ LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
tearDownClean();
}
});
public ExtensionConverterProvider getExtensionConverterProvider() {
return extensionConverterProvider;
}
+
+ @Override
+ public void setStatisticsContext(final StatisticsContext statisticsContext) {
+ this.statCtx = statisticsContext;
+ }
+
+ @Override
+ public StatisticsContext getStatisticsContext() {
+ return statCtx;
+ }
}
private static final long TICK_DURATION = 10; // 0.5 sec.
private final long globalNotificationQuota;
+ private final boolean switchFeaturesMandatory;
private ScheduledThreadPoolExecutor spyPool;
private final int spyRate = 10;
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
@Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
- final long globalNotificationQuota) {
+ final long globalNotificationQuota, final boolean switchFeaturesMandatory) {
+ this.switchFeaturesMandatory = switchFeaturesMandatory;
this.globalNotificationQuota = globalNotificationQuota;
this.dataBroker = Preconditions.checkNotNull(dataBroker);
hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
@Override
public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
// final phase - we have to add new Device to MD-SAL DataStore
+ LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
Preconditions.checkNotNull(deviceContext);
try {
((DeviceContextImpl) deviceContext).initialSubmitTransaction();
LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
try {
deviceContext.close();
- } catch (Exception e1) {
+ } catch (final Exception e1) {
LOG.warn("Exception on device context close. ", e);
}
}
final DeviceState deviceState = createDeviceState(connectionContext);
final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
- hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+ hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
deviceContext.addDeviceContextClosedHandler(this);
Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null);
}
@Override
- public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+ public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
this.extensionConverterProvider = extensionConverterProvider;
}
private boolean flowStatisticsAvailable;
private boolean tableStatisticsAvailable;
private boolean portStatisticsAvailable;
+ private boolean statPollEnabled;
private boolean queueStatisticsAvailable;
private volatile OfpRole role;
this.nodeId = Preconditions.checkNotNull(nodeId);
nodeII = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
version = featuresReply.getVersion();
+ statPollEnabled = false;
+ deviceSynchronized = false;
}
@Override
public void setRole(OfpRole role) {
this.role = role;
}
+
+ @Override
+ public boolean isStatisticsPollingEnabled() {
+ return statPollEnabled;
+ }
+
+ @Override
+ public void setStatisticsPollingEnabledProp(final boolean statPollEnabled) {
+ this.statPollEnabled = statPollEnabled;
+ }
}
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
* transactions. Call this method for MASTER role only.
*/
public void activateTransactionManager() {
- LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId());
+ LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId());
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
future = wTx.submit();
wTx = null;
+ Futures.withFallback(future, new FutureFallback<Void>() {
+
+ @Override
+ public ListenableFuture<Void> create(final Throwable t) throws Exception {
+ final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+ delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+ return delWtx.submit();
+ }
+ });
}
return future;
}
public void initialization() throws CandidateAlreadyRegisteredException {
state = ROLE_CONTEXT_STATE.STARTING;
LOG.debug("Initialization requestOpenflowEntityOwnership for entity {}", entity);
- entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
- LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
- .getPrimaryConnectionContext().getNodeId().getValue());
+ entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
+ LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}",
+ deviceContext.getPrimaryConnectionContext().getNodeId().getValue());
}
@Override
LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
deviceContext.getPrimaryConnectionContext().getNodeId());
- final SetRoleInput setRoleInput = (new SetRoleInputBuilder())
- .setControllerRole(newRole)
- .setNode(new NodeRef(deviceContext.getDeviceState().getNodeInstanceIdentifier()))
- .build();
-
- final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = salRoleService.setRole(setRoleInput);
-
- return Futures.transform(JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture),
- new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
- @Override
- public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
- LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
- final ListenableFuture<Void> nextStepFuture;
- switch (state) {
- case STARTING:
- if (OfpRole.BECOMESLAVE.equals(newRole)) {
- getDeviceState().setRole(newRole);
- nextStepFuture = Futures.immediateFuture(null);
- } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
- nextStepFuture = deviceContext.onClusterRoleChange(newRole);
- } else {
- nextStepFuture = Futures.immediateFuture(null);
- }
-
- break;
- case WORKING:
- nextStepFuture = deviceContext.onClusterRoleChange(newRole);
- break;
- //case TEARING_DOWN:
- default:
- nextStepFuture = Futures.immediateFuture(null);
- break;
- }
-
- return nextStepFuture;
+
+ final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeRpcFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
+ LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
+ final ListenableFuture<Void> nextStepFuture;
+ switch (state) {
+ case STARTING:
+ if (OfpRole.BECOMESLAVE.equals(newRole)) {
+ getDeviceState().setRole(newRole);
+ nextStepFuture = Futures.immediateFuture(null);
+ } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
+ nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+ } else {
+ nextStepFuture = Futures.immediateFuture(null);
}
- });
+
+ break;
+ case WORKING:
+ nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+ break;
+ //case TEARING_DOWN:
+ default:
+ nextStepFuture = Futures.immediateFuture(null);
+ break;
+ }
+
+ return nextStepFuture;
+ }
+ };
+
+ return sendRoleChangeToDevice(newRole, roleChangeRpcFunction);
}
@Override
@Nullable
@Override
public <T> RequestContext<T> createRequestContext() {
- final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
@Override
public void close() {
}
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
- if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
- // Roles are not supported before OF1.3, so move forward.
- deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
- return;
- }
final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
makeEntity(deviceContext.getDeviceState().getNodeId()),
@Override
public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
Preconditions.checkArgument(ownershipChange != null);
+ RoleContext roleCtxForClose = null;
try {
final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
if (roleContext != null) {
+ roleCtxForClose = roleContext;
changeForEntity(ownershipChange, roleContext);
return;
}
final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
if (txRoleContext != null) {
+ roleCtxForClose = txRoleContext;
changeForTxEntity(ownershipChange, txRoleContext);
return;
}
} catch (final InterruptedException e) {
LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
- // FIXME: consider forcibly closing this connection
+ if (roleCtxForClose != null) {
+ roleCtxForClose.close();
+ }
}
LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
try {
LOG.debug("Node {} is marked as LEADER", nodeId);
Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
- "RoleCtx for TxEntity {} master Node {} is still not closed.",
- roleContext.getTxEntity(), nodeId);
+ "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
// try to register tx-candidate via ownership service
roleContext.setupTxCandidate();
} catch (final CandidateAlreadyRegisteredException e) {
- LOG.debug("txCandidate registration failed");
+ LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
// --- CLEAN UP ---
// withdraw context from map in order to have it as before
txContexts.remove(roleContext.getTxEntity(), roleContext);
// no more propagating any role - there is no txCandidate lock approaching
roleContext.setPropagatingRole(null);
-
- Throwables.propagate(e);
+ roleContext.getDeviceContext().close();
}
return null;
}
final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
mainCandidateGuard.acquire();
- //FIXME : check again implementation for double candidate scenario
LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
if (roleContext.getDeviceState().isValid()) {
} else {
txProcessCallback = null;
}
- } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
+ } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
// SLAVE -> MASTER
txProcessCallback = makeTxEntitySetupCallback(roleContext);
+ } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
+ if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
+ rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
+ }
+ txProcessCallback = null;
} else {
LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
txProcessCallback = null;
// catching result
Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- }
+ @Override
+ public void onSuccess(@Nullable final Void aVoid) {
+ LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
+ ownershipChange.getEntity(), oldRole, newRole);
+ roleContext.setPropagatingRole(newRole);
+ mainCandidateGuard.release();
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
- ownershipChange.getEntity(), oldRole, newRole);
- mainCandidateGuard.release();
- roleContext.getDeviceContext().close();
- }
- }
- );
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
+ ownershipChange.getEntity(), oldRole, newRole);
+ mainCandidateGuard.release();
+ roleContext.getDeviceContext().close();
+ }
+ });
} else {
LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
return delFuture;
}
- private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
+
+ private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
- Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
+ Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable final Void aVoid) {
- LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
+ LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
+ contexts.remove(ownershipChange.getEntity(), roleContext);
+ roleContext.suspendTxCandidate();
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
+ LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
.getNodeId(), throwable.getMessage());
- contexts.remove(ownershipChange.getEntity(), roleChangeListener);
- ((RoleContext) roleChangeListener).suspendTxCandidate();
+ contexts.remove(ownershipChange.getEntity(), roleContext);
+ roleContext.suspendTxCandidate();
}
});
}
LOG.info("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits());
}
- final Long xid = deviceContext.getReservedXid();
+ final Long xid = deviceContext.reservedXidForDeviceMessage();
if (xid == null) {
LOG.error("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId());
}
- return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+ return new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
@Override
public void close() {
tracker.release();
@Override
public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
- NodeId nodeId = deviceContext.getDeviceState().getNodeId();
- OfpRole ofpRole = deviceContext.getDeviceState().getRole();
+ final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
+ final OfpRole ofpRole = deviceContext.getDeviceState().getRole();
LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", nodeId, ofpRole);
contexts.put(deviceContext, rpcContext);
}
+ deviceContext.addDeviceContextClosedHandler(this);
- if (ofpRole == OfpRole.BECOMESLAVE) {
- // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
- LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
- try {
- MdSalRegistratorUtils.unregisterServices(rpcContext);
- } catch (Exception e) {
- LOG.error("Exception while unregistering rpcs for slave role for node:{}. But continuing.", nodeId, e);
- }
-
- } else {
+ if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
- MdSalRegistratorUtils.registerServices(rpcContext, deviceContext, ofpRole);
+ MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
if (isStatisticsRpcEnabled) {
MdSalRegistratorUtils.registerStatCompatibilityServices(rpcContext, deviceContext,
notificationPublishService, new AtomicLong());
}
+ } else if(OfpRole.BECOMESLAVE.equals(ofpRole)) {
+ // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
+ LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+ MdSalRegistratorUtils.registerSlaveServices(rpcContext, ofpRole);
+ } else {
+ // if we don't know role, we need to unregister rpcs if any have been registered
+ LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+ MdSalRegistratorUtils.unregisterServices(rpcContext);
}
- deviceContext.addDeviceContextClosedHandler(this);
-
// finish device initialization cycle back to DeviceManager
deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
}
@Override
- public void onDeviceContextClosed(DeviceContext deviceContext) {
- RpcContext removedContext = contexts.remove(deviceContext);
+ public void onDeviceContextClosed(final DeviceContext deviceContext) {
+ final RpcContext removedContext = contexts.remove(deviceContext);
if (removedContext != null) {
try {
LOG.info("Unregistering rpcs for device context closure");
removedContext.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Exception while unregistering rpcs onDeviceContextClosed handler for node:{}. But continuing.",
deviceContext.getDeviceState().getNodeId(), e);
}
}
}
- public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) {
+ @Override
+ public void setStatisticsRpcEnabled(final boolean isStatisticsRpcEnabled) {
this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
}
@Override
- public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+ public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
this.notificationPublishService = notificationPublishService;
}
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Iterator;
import java.util.List;
import javax.annotation.CheckForNull;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
private final DeviceContext deviceContext;
private final DeviceState devState;
private final ListenableFuture<Boolean> emptyFuture;
- private final List<MultipartType> collectingStatType;
+ private final boolean shuttingDownStatisticsPolling;
+ private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
+ @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
+ private List<MultipartType> collectingStatType;
private StatisticsGatheringService statisticsGatheringService;
private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
private Timeout pollTimeout;
- public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) {
+ public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext,
+ final boolean shuttingDownStatisticsPolling) {
this.deviceContext = Preconditions.checkNotNull(deviceContext);
- devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+ this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+ this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
emptyFuture = Futures.immediateFuture(new Boolean(false));
statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
-
- final List<MultipartType> statListForCollecting = new ArrayList<>();
- if (devState.isTableStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPTABLE);
- }
- if (devState.isFlowStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPFLOW);
- }
- if (devState.isGroupAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
- statListForCollecting.add(MultipartType.OFPMPGROUP);
- }
- if (devState.isMetersAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
- statListForCollecting.add(MultipartType.OFPMPMETER);
- }
- if (devState.isPortStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
- }
- if (devState.isQueueStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPQUEUE);
- }
- collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
+ statListForCollectingInitialization();
+ deviceContext.setStatisticsContext(StatisticsContextImpl.this);
}
@Override
- public ListenableFuture<Boolean> gatherDynamicData() {
- final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
- if (errorResultFuture != null) {
- return errorResultFuture;
+ public void statListForCollectingInitialization() {
+ synchronized (COLLECTION_STAT_TYPE_LOCK) {
+ final List<MultipartType> statListForCollecting = new ArrayList<>();
+ if (devState.isTableStatisticsAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPTABLE);
+ }
+ if (devState.isFlowStatisticsAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPFLOW);
+ }
+ if (devState.isGroupAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
+ statListForCollecting.add(MultipartType.OFPMPGROUP);
+ }
+ if (devState.isMetersAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
+ statListForCollecting.add(MultipartType.OFPMPMETER);
+ }
+ if (devState.isPortStatisticsAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
+ }
+ if (devState.isQueueStatisticsAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPQUEUE);
+ }
+ collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
}
- final Iterator<MultipartType> statIterator = collectingStatType.iterator();
- final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
- statChainFuture(statIterator, settableStatResultFuture);
- return settableStatResultFuture;
}
- private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType) {
- switch (multipartType) {
- case OFPMPFLOW:
- return collectFlowStatistics(multipartType);
- case OFPMPTABLE:
- return collectTableStatistics(multipartType);
- case OFPMPPORTSTATS:
- return collectPortStatistics(multipartType);
- case OFPMPQUEUE:
- return collectQueueStatistics(multipartType);
- case OFPMPGROUPDESC:
- return collectGroupDescStatistics(multipartType);
- case OFPMPGROUP:
- return collectGroupStatistics(multipartType);
- case OFPMPMETERCONFIG:
- return collectMeterConfigStatistics(multipartType);
- case OFPMPMETER:
- return collectMeterStatistics(multipartType);
- default:
- LOG.warn("Unsuported Statistics type {}", multipartType);
- return Futures.immediateCheckedFuture(Boolean.TRUE);
+ @Override
+ public ListenableFuture<Boolean> gatherDynamicData() {
+ if (shuttingDownStatisticsPolling) {
+ LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceState().getNodeId());
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
+ if (errorResultFuture != null) {
+ return errorResultFuture;
+ }
+ synchronized (COLLECTION_STAT_TYPE_LOCK) {
+ final Iterator<MultipartType> statIterator = collectingStatType.iterator();
+ final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
+ statChainFuture(statIterator, settableStatResultFuture);
+ return settableStatResultFuture;
+ }
}
- }
- @Override
- public <T> RequestContext<T> createRequestContext() {
- final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
- @Override
- public void close() {
- requestContexts.remove(this);
+ private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
+ switch (multipartType) {
+ case OFPMPFLOW:
+ return collectFlowStatistics(multipartType);
+ case OFPMPTABLE:
+ return collectTableStatistics(multipartType);
+ case OFPMPPORTSTATS:
+ return collectPortStatistics(multipartType);
+ case OFPMPQUEUE:
+ return collectQueueStatistics(multipartType);
+ case OFPMPGROUPDESC:
+ return collectGroupDescStatistics(multipartType);
+ case OFPMPGROUP:
+ return collectGroupStatistics(multipartType);
+ case OFPMPMETERCONFIG:
+ return collectMeterConfigStatistics(multipartType);
+ case OFPMPMETER:
+ return collectMeterStatistics(multipartType);
+ default:
+ LOG.warn("Unsuported Statistics type {}", multipartType);
+ return Futures.immediateCheckedFuture(Boolean.TRUE);
}
- };
- requestContexts.add(ret);
- return ret;
- }
-
- @Override
- public void close() {
- for (final RequestContext<?> requestContext : requestContexts) {
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
}
- if (null != pollTimeout && !pollTimeout.isExpired()) {
- pollTimeout.cancel();
- }
- }
- @Override
- public void setPollTimeout(Timeout pollTimeout) {
- this.pollTimeout = pollTimeout;
- }
- @Override
- public Optional<Timeout> getPollTimeout() {
- return Optional.fromNullable(pollTimeout);
- }
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
+ @Override
+ public void close() {
+ requestContexts.remove(this);
+ }
+ };
+ requestContexts.add(ret);
+ return ret;
+ }
+
+ @Override
+ public void close () {
+ for (final RequestContext<?> requestContext : requestContexts) {
+ RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
+ }
+ if (null != pollTimeout && !pollTimeout.isExpired()) {
+ pollTimeout.cancel();
+ }
+ }
- void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture) {
- if ( ! iterator.hasNext()) {
- resultFuture.set(Boolean.TRUE);
- LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
- return;
+ @Override
+ public void setPollTimeout (Timeout pollTimeout){
+ this.pollTimeout = pollTimeout;
}
- final MultipartType nextType = iterator.next();
- LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+ @Override
+ public Optional<Timeout> getPollTimeout () {
+ return Optional.fromNullable(pollTimeout);
+ }
- final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
- Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(final Boolean result) {
- statChainFuture(iterator, resultFuture);
+ void statChainFuture ( final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture){
+ if (!iterator.hasNext()) {
+ resultFuture.set(Boolean.TRUE);
+ LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
+ return;
}
- @Override
- public void onFailure(final Throwable t) {
- resultFuture.setException(t);
- }
- });
- }
- /**
- * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
- * which has to be returned from caller too
- *
- * @return
- */
- @VisibleForTesting
- ListenableFuture<Boolean> deviceConnectionCheck() {
- if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
- ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
- switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
- case RIP:
- final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
- deviceContext.getPrimaryConnectionContext().getConnectionState());
- resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
- break;
- default:
- resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
- break;
+ final MultipartType nextType = iterator.next();
+ LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+
+ final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
+ Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(final Boolean result) {
+ statChainFuture(iterator, resultFuture);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ resultFuture.setException(t);
+ }
+ });
+ }
+
+ /**
+ * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
+ * which has to be returned from caller too
+ *
+ * @return
+ */
+ @VisibleForTesting
+ ListenableFuture<Boolean> deviceConnectionCheck () {
+ if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+ ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
+ switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
+ case RIP:
+ final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
+ deviceContext.getPrimaryConnectionContext().getConnectionState());
+ resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
+ break;
+ default:
+ resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
+ break;
+ }
+ return resultingFuture;
}
- return resultingFuture;
+ return null;
}
- return null;
- }
- private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType) {
- return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectFlowStatistics ( final MultipartType multipartType){
+ return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
- return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectTableStatistics ( final MultipartType multipartType){
+ return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
- return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectPortStatistics ( final MultipartType multipartType){
+ return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
- return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectQueueStatistics ( final MultipartType multipartType){
+ return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
- return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectGroupDescStatistics ( final MultipartType multipartType){
+ return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
- return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectGroupStatistics ( final MultipartType multipartType){
+ return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
- return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectMeterConfigStatistics ( final MultipartType multipartType){
+ return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
+ }
- private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
- return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
- }
+ private ListenableFuture<Boolean> collectMeterStatistics ( final MultipartType multipartType){
+ return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+ statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
+ }
- @VisibleForTesting
- protected void setStatisticsGatheringService(StatisticsGatheringService statisticsGatheringService) {
- this.statisticsGatheringService = statisticsGatheringService;
- }
+ @VisibleForTesting
+ protected void setStatisticsGatheringService ( final StatisticsGatheringService statisticsGatheringService){
+ this.statisticsGatheringService = statisticsGatheringService;
+ }
- @VisibleForTesting
- protected void setStatisticsGatheringOnTheFlyService(StatisticsGatheringOnTheFlyService
- statisticsGatheringOnTheFlyService) {
- this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
- }
+ @VisibleForTesting
+ protected void setStatisticsGatheringOnTheFlyService ( final StatisticsGatheringOnTheFlyService
+ statisticsGatheringOnTheFlyService){
+ this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
+ }
- @Override
- public ItemLifecycleListener getItemLifeCycleListener() {
- return itemLifeCycleListener;
+ @Override
+ public ItemLifecycleListener getItemLifeCycleListener () {
+ return itemLifeCycleListener;
+ }
}
-}
package org.opendaylight.openflowplugin.impl.statistics;
+import javax.annotation.CheckForNull;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private final RpcProviderRegistry rpcProviderRegistry;
+
+ private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
- private Semaphore workModeGuard = new Semaphore(1, true);
+ private final Semaphore workModeGuard = new Semaphore(1, true);
private boolean shuttingDownStatisticsPolling;
private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
deviceInitPhaseHandler = handler;
}
- public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry) {
- this.rpcProviderRegistry = rpcProviderRegistry;
+ public StatisticsManagerImpl(@CheckForNull final RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
+ Preconditions.checkArgument(rpcProviderRegistry != null);
controlServiceRegistration = rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this);
- }
-
- public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
- this(rpcProviderRegistry);
this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
}
LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
hashedWheelTimer = deviceContext.getTimer();
}
-
- LOG.info("Starting Statistics for master role for node:{}", deviceContext.getDeviceState().getNodeId());
-
- final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
+ final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, shuttingDownStatisticsPolling);
deviceContext.addDeviceContextClosedHandler(this);
- if (deviceContext.getDeviceState().getRole() == OfpRole.BECOMESLAVE) {
- // if slave, we dont poll for statistics and jump to rpc initialization
- LOG.info("Skipping Statistics for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+ if (shuttingDownStatisticsPolling) {
+ LOG.info("Statistics is shutdown for node:{}", deviceContext.getDeviceState().getNodeId());
+ } else {
+ LOG.info("Schedule Statistics poll for node:{}", deviceContext.getDeviceState().getNodeId());
+ if (OfpRole.BECOMEMASTER.equals(deviceContext.getDeviceState().getRole())) {
+ initialStatPollForMaster(statisticsContext, deviceContext);
+ /* we want to wait for initial statCollecting response */
+ return;
+ }
scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
- deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
- return;
}
+ contexts.put(deviceContext, statisticsContext);
+ deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
+ deviceContext.getDeviceState().setDeviceSynchronized(true);
+ }
+ private void initialStatPollForMaster(final StatisticsContext statisticsContext, final DeviceContext deviceContext) {
final ListenableFuture<Boolean> weHaveDynamicData = statisticsContext.gatherDynamicData();
Futures.addCallback(weHaveDynamicData, new FutureCallback<Boolean>() {
@Override
//there are some statistics on device worth gathering
contexts.put(deviceContext, statisticsContext);
final TimeCounter timeCounter = new TimeCounter();
+ deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
try {
deviceContext.getDeviceState().setDeviceSynchronized(true);
} else {
final String deviceAdress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress().toString();
- try {
- deviceContext.close();
- } catch (Exception e) {
- LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
- }
+ LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
+ deviceContext.close();
}
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Statistics manager was not able to collect dynamic info for device.", deviceContext.getDeviceState().getNodeId(), throwable);
- try {
- deviceContext.close();
- } catch (Exception e) {
- LOG.warn("Error closing device context.", e);
- }
+ deviceContext.close();
}
});
}
LOG.debug("Session for device {} is not valid.", deviceContext.getDeviceState().getNodeId().getValue());
return;
}
- LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
+ if (!deviceContext.getDeviceState().isStatisticsPollingEnabled()) {
+ LOG.debug("StatisticsPolling is disabled for device: {} , try later", deviceContext.getDeviceState().getNodeId());
+ scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+ return;
+ }
if (OfpRole.BECOMESLAVE.equals(deviceContext.getDeviceState().getRole())) {
LOG.debug("Role is SLAVE so we don't want to poll any stat for device: {}", deviceContext.getDeviceState().getNodeId());
scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
return;
}
+
+ LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
timeCounter.markStart();
- ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+ final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(final Boolean o) {
}
});
- final long STATS_TIMEOUT_SEC = 20L;
- try {
- deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
- } catch (final TimeoutException e) {
- LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
- }
+ final long averangeTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
+ final long STATS_TIMEOUT_SEC = averangeTime > 0 ? 3 * averangeTime : DEFAULT_STATS_TIMEOUT_SEC;
+ final TimerTask timerTask = new TimerTask() {
+
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ if (!deviceStatisticsCollectionFuture.isDone()) {
+ LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext
+ .getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
+ deviceStatisticsCollectionFuture.cancel(true);
+ }
+ }
+ };
+ deviceContext.getTimer().newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
}
private void scheduleNextPolling(final DeviceContext deviceContext,
if (null != hashedWheelTimer) {
LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
if (!shuttingDownStatisticsPolling) {
- Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
+ final Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
pollStatistics(deviceContext, statisticsContext, timeCounter);
@VisibleForTesting
protected void calculateTimerDelay(final TimeCounter timeCounter) {
- long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+ final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
if (averageStatisticsGatheringTime > currentTimerDelay) {
currentTimerDelay *= 2;
if (currentTimerDelay > maximumTimerDelay) {
@Override
public void onDeviceContextClosed(final DeviceContext deviceContext) {
- StatisticsContext statisticsContext = contexts.remove(deviceContext);
+ final StatisticsContext statisticsContext = contexts.remove(deviceContext);
if (null != statisticsContext) {
LOG.trace("Removing device context from stack. No more statistics gathering for node {}", deviceContext.getDeviceState().getNodeId());
- try {
- statisticsContext.close();
- } catch (Exception e) {
- LOG.debug("Error closing statistic context for node {}.", deviceContext.getDeviceState().getNodeId());
- }
+ statisticsContext.close();
}
}
@Override
public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
- GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
+ final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
smModeOutputBld.setMode(workMode);
return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
}
if (!workMode.equals(targetWorkMode)) {
shuttingDownStatisticsPolling = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode);
// iterate through stats-ctx: propagate mode
- for (Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
+ for (final Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
final DeviceContext deviceContext = contextEntry.getKey();
final StatisticsContext statisticsContext = contextEntry.getValue();
switch (targetWorkMode) {
case COLLECTALL:
scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
- for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+ for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
lifeCycleSource.setItemLifecycleListener(null);
}
break;
if (pollTimeout.isPresent()) {
pollTimeout.get().cancel();
}
- for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+ for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
}
break;
controlServiceRegistration.close();
controlServiceRegistration = null;
}
+ for (final StatisticsContext statCtx : contexts.values()) {
+ statCtx.close();
+ }
+ contexts.clear();
}
}
final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
- final Long reserved = deviceContext.getReservedXid();
+ final Long reserved = deviceContext.reservedXidForDeviceMessage();
final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(
reserved) {
@Override
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.reflect.TypeToken;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.CheckForNull;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
throw new IllegalStateException();
}
+ /**
+ * Method registers all OF services for role {@link OfpRole#BECOMEMASTER}
+ * @param rpcContext - registration processing is implemented in {@link RpcContext}
+ * @param deviceContext - every service needs {@link DeviceContext} as input parameter
+ * @param newRole - role validation for {@link OfpRole#BECOMEMASTER}
+ */
+ public static void registerMasterServices(@CheckForNull final RpcContext rpcContext,
+ @CheckForNull final DeviceContext deviceContext, @CheckForNull final OfpRole newRole) {
+ Preconditions.checkArgument(rpcContext != null);
+ Preconditions.checkArgument(deviceContext != null);
+ Preconditions.checkArgument(newRole != null);
+ Verify.verify(OfpRole.BECOMEMASTER.equals(newRole), "Service call with bad Role {} we expect role BECOMEMASTER", newRole);
- public static void registerServices(final RpcContext rpcContext, final DeviceContext deviceContext, final OfpRole newRole) {
rpcContext.registerRpcServiceImplementation(SalEchoService.class, new SalEchoServiceImpl(rpcContext, deviceContext));
- if (OfpRole.BECOMEMASTER.equals(newRole)) {
- rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
- //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
- rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
- rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
- // TODO: experimenter symmetric and multipart message services
- rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
+ //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+ rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
+ rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
+ // TODO: experimenter symmetric and multipart message services
+ rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+ }
+
+ /**
+ * Method unregisters all services in first step. So we don't need to call {@link MdSalRegistratorUtils#unregisterServices(RpcContext)}
+ * directly before by change role from {@link OfpRole#BECOMEMASTER} to {@link OfpRole#BECOMESLAVE}.
+ * Method registers {@link SalEchoService} in next step only because we would like to have SalEchoService as local service for all apps
+ * to be able actively check connection status for slave connection too.
+ * @param rpcContext - registration/unregistration processing is implemented in {@link RpcContext}
+ * @param newRole - role validation for {@link OfpRole#BECOMESLAVE}
+ */
+ public static void registerSlaveServices(@CheckForNull final RpcContext rpcContext, @CheckForNull final OfpRole newRole) {
+ Preconditions.checkArgument(rpcContext != null);
+ Preconditions.checkArgument(newRole != null);
+ Verify.verify(OfpRole.BECOMESLAVE.equals(newRole), "Service call with bad Role {} we expect role BECOMESLAVE", newRole);
- } else if (OfpRole.BECOMESLAVE.equals(newRole)) {
- rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
- //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
- rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
- rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
- rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
- rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
- rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
- rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
- rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
- rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
- // TODO: experimenter symmetric and multipart message services
- rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
- }
+ unregisterServices(rpcContext);
}
- public static void unregisterServices(final RpcContext rpcContext) throws Exception {
- rpcContext.close();
+ /**
+ * Method unregisters all OF services.
+ * @param rpcContext - unregistration processing is implemented in {@link RpcContext}
+ */
+ public static void unregisterServices(@CheckForNull final RpcContext rpcContext) {
+ Preconditions.checkArgument(rpcContext != null);
+
+ rpcContext.unregisterRpcServiceImplementation(SalEchoService.class);
+ rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
+ //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+ rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
+ rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
+ rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
+ rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
+ rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
+ rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
+ rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
+ rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
+ // TODO: experimenter symmetric and multipart message services
+ rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
}
/**
rpcContext.lookupRpcService(OpendaylightFlowStatisticsService.class));
Preconditions.checkArgument(COMPOSITE_SERVICE_TYPE_TOKEN.isAssignableFrom(flowStatisticsService.getClass()));
// attach delegate to flow statistics service (to cover all but aggregated stats with match filter input)
- OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
+ final OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
new OpendaylightFlowStatisticsServiceDelegateImpl(rpcContext, deviceContext, notificationPublishService, new AtomicLong());
((Delegator<OpendaylightFlowStatisticsService>) flowStatisticsService).setDelegate(flowStatisticsDelegate);
org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved.class.getName()))))
.thenReturn(messageTranslatorFlowRemoved);
- deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+ deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false);
xid = new Xid(atomicLong.incrementAndGet());
xidMulti = new Xid(atomicLong.incrementAndGet());
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDataBroker() throws Exception {
- new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+ new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDeviceState() throws Exception {
- new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+ new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullTimer() throws Exception {
- new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+ new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
}
@Test
@Test
public void testGetReservedXid() {
- deviceContext.getReservedXid();
+ deviceContext.reservedXidForDeviceMessage();
verify(outboundQueueProvider).reserveEntry();
}
MessageIntelligenceAgency mockedMessageIntelligenceAgency = mock(MessageIntelligenceAgency.class);
DeviceManagerImpl deviceManager = new DeviceManagerImpl(mockedDataBroker, mockedMessageIntelligenceAgency,
- TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA);
+ TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA, false);
deviceManager.setDeviceInitializationPhaseHandler(deviceInitPhaseHandler);
return deviceManager;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@Mock
private FeaturesReply featuresReply;
+ @Mock
+ private MessageSpy mockedMessageSpy;
private final NodeId nodeId = NodeId.getDefaultInstance("openflow:1");
private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
public void setup() throws CandidateAlreadyRegisteredException {
when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionContext);
when(deviceContext.getDeviceState()).thenReturn(deviceState);
+ when(deviceContext.getMessageSpy()).thenReturn(mockedMessageSpy);
when(connectionContext.getNodeId()).thenReturn(nodeId);
when(deviceState.getNodeInstanceIdentifier()).thenReturn(instanceIdentifier);
when(deviceState.getNodeId()).thenReturn(nodeId);
when(rpcProviderRegistry.getRpcService(SalRoleService.class)).thenReturn(salRoleService);
when(deviceState.getFeatures()).thenReturn(getFeaturesOutput);
- when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+ when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_0);
when(deviceContext.getPrimaryConnectionContext().getFeatures()).thenReturn(featuresReply);
when(deviceContext.getPrimaryConnectionContext().getConnectionState()).thenReturn(ConnectionContext.CONNECTION_STATE.WORKING);
- when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any())).thenReturn(Futures.immediateFuture((Void) null));
+ when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any(), Matchers.<OfpRole>any()))
+ .thenReturn(Futures.immediateFuture((Void) null));
roleContext = new RoleContextImpl(deviceContext, entityOwnershipService, entity, txEntity);
roleContext.initialization();
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(FUTURE_SAFETY_TIMEOUT, TimeUnit.SECONDS);
- verify(deviceContext).onClusterRoleChange(newRole);
+ verify(deviceContext).onClusterRoleChange(oldRole, newRole);
}
@Test
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(5, TimeUnit.SECONDS);
- verify(deviceContext, Mockito.never()).onClusterRoleChange(newRole);
+ verify(deviceContext, Mockito.never()).onClusterRoleChange(oldRole, newRole);
}
@Test
.thenReturn(future);
roleContext.setSalRoleService(salRoleService);
- roleContext.promoteStateToWorking();
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(5, TimeUnit.SECONDS);
- verify(deviceContext).onClusterRoleChange(newRole);
+ verify(deviceContext).onClusterRoleChange(oldRole, newRole);
}
@Test
final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
onRoleChanged.get(5, TimeUnit.SECONDS);
- verify(deviceContext).onClusterRoleChange(newRole);
+ verify(deviceContext).onClusterRoleChange(oldRole, newRole);
}
private class SetRoleInputMatcher extends ArgumentMatcher<SetRoleInput> {
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.Collections;
public class StatisticsContextImplParamTest extends StatisticsContextImpMockInitiation {
- public StatisticsContextImplParamTest(boolean isTable, boolean isFlow, boolean isGroup, boolean isMeter, boolean isPort,
- boolean isQueue) {
+ public StatisticsContextImplParamTest(final boolean isTable, final boolean isFlow, final boolean isGroup, final boolean isMeter, final boolean isPort,
+ final boolean isQueue) {
super();
this.isTable = isTable;
this.isFlow = isFlow;
@Test
public void gatherDynamicDataTest() {
- StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+ final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
- ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
+ final ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
when(mockedStatisticsGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
.class))).thenReturn(rpcResult);
when(mockedStatisticsOnFlyGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
- ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
+ final ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
try {
assertTrue(futureResult.get());
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
@Before
public void setUp() throws Exception {
- when(mockedDeviceContext.getReservedXid()).thenReturn(TEST_XID);
+ when(mockedDeviceContext.reservedXidForDeviceMessage()).thenReturn(TEST_XID);
initStatisticsContext();
}
private void initStatisticsContext() {
- statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+ statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
}
@Test
public void testCreateRequestContext() {
- RequestContext<Object> requestContext = statisticsContext.createRequestContext();
+ final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
assertNotNull(requestContext);
assertEquals(TEST_XID, requestContext.getXid().getValue());
Assert.assertFalse(requestContext.getFuture().isDone());
*/
@Test
public void testClose() throws Exception {
- StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+ final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
statisticsContext.close();
try {
final RpcResult<?> rpcResult = requestContext.getFuture().get();
Assert.assertFalse(rpcResult.isSuccessful());
Assert.assertFalse(rpcResult.isSuccessful());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("request future value should be finished", e);
Assert.fail("request context closing failed");
}
try {
deviceConnectionCheckResult.get();
Assert.fail("connection in state RIP should have caused exception here");
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.debug("expected behavior for RIP connection achieved");
Assert.assertTrue(e instanceof ExecutionException);
}
try {
final Boolean checkPositive = deviceConnectionCheckResult.get();
Assert.assertTrue(checkPositive);
- } catch (Exception e) {
+ } catch (final Exception e) {
Assert.fail("connection in state HANDSHAKING should NOT have caused exception here");
}
}
Matchers.eq(StatisticsManagerControlService.class),
Matchers.<StatisticsManagerControlService>any())).thenReturn(serviceControlRegistration);
- statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry);
+ statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, false);
}
@Test
statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
- verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+ verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
verify(mockedDeviceState).setDeviceSynchronized(true);
verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
verify(hashedWheelTimer).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
- verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+ verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
verify(mockedDeviceState).setDeviceSynchronized(true);
verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
verify(hashedWheelTimer, Mockito.never()).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
public class MdSalRegistratorUtilsTest {
/**
- * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerServices
+ * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerMasterServices
* (RpcContext, DeviceContext)}
*/
private static final int NUMBER_OF_RPC_SERVICE_REGISTRATION = 11;
when(mockedFeatures.getDatapathId()).thenReturn(mockedDataPathId);
when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedConnectionContext);
- MdSalRegistratorUtils.registerServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
+ MdSalRegistratorUtils.registerMasterServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
verify(mockedRpcContext, times(NUMBER_OF_RPC_SERVICE_REGISTRATION)).registerRpcServiceImplementation(any
(RpcService.class.getClass()), any(RpcService.class));
}