* @return handshake pool
*/
ThreadPoolExecutor getHandshakePool();
+
+ @Override
+ void close();
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
/**
- * <p>
* The central entity of OFP is the Device Context, which encapsulate the logical state of a switch
* as seen by the controller. Each OpenFlow session is tracked by a Connection Context.
* These attach to a particular Device Context in such a way, that there is at most one primary
* which is enforced by keeping a cap on the number of outstanding requests a particular Request
* Context can have at any point in time. Should this quota be exceeded, any further attempt to make
* a request to the switch will fail immediately, with proper error indication.
- * </p>
*/
public interface DeviceContext extends
OFPContext,
*/
void shutdownConnection();
- /**
- * Initial submit transaction
- */
- void initialSubmitTransaction();
-
/**
* Method add auxiliary connection contexts to this context representing single device connection.
* @param connectionContext new connection context
/**
* @return current devices auxiliary connection contexts
*/
- ConnectionContext getAuxiliaryConnectiobContexts(BigInteger cookie);
+ ConnectionContext getAuxiliaryConnectionContexts(BigInteger cookie);
/**
/**
* @return version
*/
- Short getVersion();
+ short getVersion();
/**
* @return datapathId
AutoCloseable,
DeviceTerminationPhaseHandler {
- CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries);
+ CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo);
+
}
@Override
public void closeConnection(final boolean propagate) {
- if (null == nodeId){
+ if (Objects.isNull(nodeId)){
SessionStatistics.countEvent(connectionAdapter.getRemoteAddress().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);
} else {
SessionStatistics.countEvent(nodeId.toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);
}
- final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO;
- LOG.debug("Actively closing connection: {}, datapathId: {}",
- connectionAdapter.getRemoteAddress(), datapathId);
- connectionState = ConnectionContext.CONNECTION_STATE.RIP;
-
- Future<Void> future = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- unregisterOutboundQueue();
- return null;
- }
- });
- try {
- future.get(1, TimeUnit.SECONDS);
- LOG.info("Unregister outbound queue successful.");
- } catch (InterruptedException | TimeoutException | ExecutionException e) {
- LOG.warn("Unregister outbound queue throws exception for node {} ", getSafeNodeIdForLOG());
- LOG.trace("Unregister outbound queue throws exception for node {} ", getSafeNodeIdForLOG(), e);
+ final BigInteger datapathId = Objects.nonNull(featuresReply) ? featuresReply.getDatapathId() : BigInteger.ZERO;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Actively closing connection: {}, datapathId: {}",
+ connectionAdapter.getRemoteAddress(), datapathId);
}
+ connectionState = ConnectionContext.CONNECTION_STATE.RIP;
+ unregisterOutboundQueue();
closeHandshakeContext();
if (getConnectionAdapter().isAlive()) {
}
if (propagate) {
- LOG.debug("Propagating device disconnect for node {}", getSafeNodeIdForLOG());
propagateDeviceDisconnectedEvent();
- } else {
- LOG.debug("Close connection without propagating for node {}", getSafeNodeIdForLOG());
}
}
}
private void propagateDeviceDisconnectedEvent() {
- if (null != deviceDisconnectedHandler) {
+ if (Objects.nonNull(deviceDisconnectedHandler)) {
final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO;
- LOG.debug("Propagating connection closed event: {}, datapathId:{}.",
- connectionAdapter.getRemoteAddress(), datapathId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Propagating connection closed event: {}, datapathId:{}.",
+ connectionAdapter.getRemoteAddress(), datapathId);
+ }
deviceDisconnectedHandler.onDeviceDisconnected(this);
}
}
*/
@Override
public String getSafeNodeIdForLOG() {
- return null == nodeId ? "null" : nodeId.getValue();
+ return Objects.nonNull(nodeId) ? nodeId.getValue() : "null";
}
@Override
}
private void unregisterOutboundQueue() {
- LOG.debug("Trying unregister outbound queue handler registration for node {}", nodeId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying unregister outbound queue handler registration for node {}", nodeId);
+ }
if (outboundQueueHandlerRegistration != null) {
outboundQueueHandlerRegistration.close();
outboundQueueHandlerRegistration = null;
}
@Override
- public Short getVersion() {
+ public short getVersion() {
return version;
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 12.5.2015.
- */
public class OutboundQueueProviderImpl implements OutboundQueueProvider {
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueProviderImpl.class);
private final short ofVersion;
@Override
public void onHandshakeSuccessful(final GetFeaturesOutput featureOutput, final Short version) {
- LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
- closeHandshakeContext();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
+ }
+ this.handshakeContext.close();
connectionContext.changeStateToWorking();
connectionContext.setFeatures(featureOutput);
connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId()));
return new FutureCallback<RpcResult<BarrierOutput>>() {
@Override
public void onSuccess(@Nullable final RpcResult<BarrierOutput> result) {
- LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getNodeId().getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getNodeId().getValue());
+ }
try {
ConnectionStatus connectionStatusResult = deviceConnectedHandler.deviceConnected(connectionContext);
if (!ConnectionStatus.MAY_CONTINUE.equals(connectionStatusResult)) {
};
}
- protected ListenableFuture<RpcResult<BarrierOutput>> fireBarrier(final Short version, final long xid) {
+ private ListenableFuture<RpcResult<BarrierOutput>> fireBarrier(final Short version, final long xid) {
final BarrierInput barrierInput = new BarrierInputBuilder()
.setXid(xid)
.setVersion(version)
.build();
return JdkFutureAdapters.listenInPoolThread(
- connectionContext.getConnectionAdapter().barrier(barrierInput));
+ this.connectionContext.getConnectionAdapter().barrier(barrierInput));
}
@Override
public void onHandshakeFailure() {
- LOG.debug("handshake failed: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
- closeHandshakeContext();
- connectionContext.closeConnection(false);
- }
-
- private void closeHandshakeContext() {
- try {
- handshakeContext.close();
- } catch (final Exception e) {
- LOG.error("Closing handshake context failed: {}", e.getMessage());
- LOG.debug("Detail in handshake context close: {}", e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handshake failed: {}", this.connectionContext.getConnectionAdapter().getRemoteAddress());
}
+ this.handshakeContext.close();
+ this.connectionContext.closeConnection(false);
}
@Override
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
private static final float LOW_WATERMARK_FACTOR = 0.75f;
// TODO: high water mark factor should be parametrized
private static final float HIGH_WATERMARK_FACTOR = 0.95f;
+ private boolean initialized;
private ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
- private final TransactionChainManager transactionChainManager;
- private final DeviceFlowRegistry deviceFlowRegistry;
- private final DeviceGroupRegistry deviceGroupRegistry;
- private final DeviceMeterRegistry deviceMeterRegistry;
+ private TransactionChainManager transactionChainManager;
+ private DeviceFlowRegistry deviceFlowRegistry;
+ private DeviceGroupRegistry deviceGroupRegistry;
+ private DeviceMeterRegistry deviceMeterRegistry;
private final PacketInRateLimiter packetInLimiter;
private final MessageSpy messageSpy;
private final ItemLifeCycleKeeper flowLifeCycleKeeper;
private volatile CONTEXT_STATE state;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
- public DeviceContextImpl(
+ DeviceContextImpl(
@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DataBroker dataBroker,
@Nonnull final MessageSpy messageSpy,
@Nonnull final DeviceManager manager,
final ConvertorExecutor convertorExecutor,
final boolean skipTableFeatures) {
- this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
+ this.primaryConnectionContext = primaryConnectionContext;
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
this.deviceState = new DeviceStateImpl();
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
- auxiliaryConnectionContexts = new HashMap<>();
- deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
- deviceGroupRegistry = new DeviceGroupRegistryImpl();
- deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.dataBroker = dataBroker;
+ this.auxiliaryConnectionContexts = new HashMap<>();
this.messageSpy = Preconditions.checkNotNull(messageSpy);
- this.deviceManager = Preconditions.checkNotNull(manager);
+ this.deviceManager = manager;
- packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+ this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
/*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
this.translatorLibrary = translatorLibrary;
- portStatusTranslator = translatorLibrary.lookupTranslator(
+ this.portStatusTranslator = translatorLibrary.lookupTranslator(
new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
- packetInTranslator = translatorLibrary.lookupTranslator(
+ this.packetInTranslator = translatorLibrary.lookupTranslator(
new TranslatorKey(deviceInfo.getVersion(), PacketIn.class.getName()));
- flowRemovedTranslator = translatorLibrary.lookupTranslator(
+ this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
- itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
- flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
- itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
+ this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
+ this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
+ this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
this.state = CONTEXT_STATE.INITIALIZATION;
this.convertorExecutor = convertorExecutor;
this.skipTableFeatures = skipTableFeatures;
+ this.initialized = false;
}
- /**
- * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
- * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
- */
@Override
public void initialSubmitTransaction() {
transactionChainManager.initialSubmitWriteTransaction();
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
final T data){
- transactionChainManager.writeToTransaction(store, path, data, false);
+ if (Objects.nonNull(transactionChainManager)) {
+ transactionChainManager.writeToTransaction(store, path, data, false);
+ }
}
@Override
public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
final T data){
- transactionChainManager.writeToTransaction(store, path, data, true);
+ if (Objects.nonNull(transactionChainManager)) {
+ transactionChainManager.writeToTransaction(store, path, data, true);
+ }
}
@Override
public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) throws TransactionChainClosedException {
- transactionChainManager.addDeleteOperationTotTxChain(store, path);
+ if (Objects.nonNull(transactionChainManager)) {
+ transactionChainManager.addDeleteOperationTotTxChain(store, path);
+ }
}
@Override
public boolean submitTransaction() {
- return transactionChainManager.submitWriteTransaction();
+ return Objects.nonNull(transactionChainManager) && transactionChainManager.submitWriteTransaction();
}
@Override
}
@Override
- public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
+ public ConnectionContext getAuxiliaryConnectionContexts(final BigInteger cookie) {
return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
}
@Override
public synchronized void shutdownConnection() {
- LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
+ }
if (CONTEXT_STATE.TERMINATION.equals(getState())) {
LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
return;
}
- setState(CONTEXT_STATE.TERMINATION);
if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue());
@Override
public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
- return transactionChainManager.shuttingDown();
+ ListenableFuture<Void> future = Futures.immediateFuture(null);
+ if (Objects.nonNull(this.transactionChainManager)) {
+ future = this.transactionChainManager.shuttingDown();
+ }
+ return future;
}
@VisibleForTesting
@Override
public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
- return this.transactionChainManager.deactivateTransactionManager();
+ ListenableFuture<Void> future = Futures.immediateFuture(null);
+ if (Objects.nonNull(this.transactionChainManager)) {
+ future = this.transactionChainManager.deactivateTransactionManager();
+ }
+ return future;
}
@Override
@Override
public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
- this.transactionChainManager.setLifecycleService(lifecycleService);
+ if (Objects.nonNull(this.transactionChainManager)) {
+ this.transactionChainManager.setLifecycleService(lifecycleService);
+ }
}
@Override
LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
+ lazyTransactionManagerInitialiaztion();
+
this.transactionChainManager.activateTransactionManager();
try {
return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
}
+
+ @VisibleForTesting
+ void lazyTransactionManagerInitialiaztion() {
+ if (!this.initialized) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
+ }
+ this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
+ this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
+ this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
+ this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.initialized = true;
+ }
+ }
}
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
final ConvertorExecutor convertorExecutor,
final boolean skipTableFeatures) {
- this.switchFeaturesMandatory = switchFeaturesMandatory;
- this.globalNotificationQuota = globalNotificationQuota;
- this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
- this.skipTableFeatures = skipTableFeatures;
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.convertorExecutor = convertorExecutor;
- this.hashedWheelTimer = hashedWheelTimer;
+ this.dataBroker = dataBroker;
+
/* merge empty nodes to oper DS to predict any problems with missing parent for Node */
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-
final NodesBuilder nodesBuilder = new NodesBuilder();
nodesBuilder.setNode(Collections.<Node>emptyList());
tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
throw new IllegalStateException(e);
}
+ this.switchFeaturesMandatory = switchFeaturesMandatory;
+ this.globalNotificationQuota = globalNotificationQuota;
+ this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+ this.skipTableFeatures = skipTableFeatures;
+ this.convertorExecutor = convertorExecutor;
+ this.hashedWheelTimer = hashedWheelTimer;
this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
this.barrierCountLimit = barrierCountLimit;
-
- spyPool = new ScheduledThreadPoolExecutor(1);
+ this.spyPool = new ScheduledThreadPoolExecutor(1);
this.singletonServiceProvider = singletonServiceProvider;
this.notificationPublishService = notificationPublishService;
this.messageSpy = messageSpy;
*/
if (deviceContexts.containsKey(deviceInfo)) {
DeviceContext deviceContext = deviceContexts.get(deviceInfo);
- LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo);
+ LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
LOG.warn("Node {} context state not in TERMINATION state.",
connectionContext.getDeviceInfo().getLOGValue());
convertorExecutor,
skipTableFeatures);
- deviceContexts.putIfAbsent(deviceInfo, deviceContext);
+ deviceContexts.put(deviceInfo, deviceContext);
final LifecycleService lifecycleService = new LifecycleServiceImpl();
lifecycleService.setDeviceContext(deviceContext);
deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
- lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
+ lifecycleServices.put(deviceInfo, lifecycleService);
deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
if (freshNotificationLimit < 100) {
freshNotificationLimit = 100;
}
- LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+ }
for (final DeviceContext deviceContext : deviceContexts.values()) {
deviceContext.updatePacketInRateLimit(freshNotificationLimit);
}
deviceCtx.shuttingDownDataStoreTransactions();
}
- if (spyPool != null) {
- spyPool.shutdownNow();
- spyPool = null;
- }
+ Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
+ spyPool = null;
+
}
@Override
}
if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
- LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
+ LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
return;
}
deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
}
//TODO: Auxiliary connections supported ?
- {
/* Device is disconnected and so we need to close TxManager */
- final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
- Futures.addCallback(future, new FutureCallback<Void>() {
+ final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
+ Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
- deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
- }
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
+ deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+ }
- @Override
- public void onFailure(final Throwable t) {
- LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
- LOG.trace("TxChainManager failed by closing. ", t);
- deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
- }
- });
- /* Add timer for Close TxManager because it could fain ind cluster without notification */
- final TimerTask timerTask = timeout -> {
- if (!future.isDone()) {
- LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
- future.cancel(false);
- }
- };
- hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
- }
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
+ LOG.trace("TxChainManager failed by closing. ", t);
+ deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+ }
+ });
+ /* Add timer for Close TxManager because it could fain ind cluster without notification */
+ final TimerTask timerTask = timeout -> {
+ if (!future.isDone()) {
+ LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
+ future.cancel(false);
+ }
+ };
+ hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
}
@VisibleForTesting
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
/**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- * <p/>
- * DeviceState is builded from {@link FeaturesReply} and {@link NodeId}. Both values are inside
- * {@link org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext}
- *
+ * Holder for device features
*/
class DeviceStateImpl implements DeviceState {
private boolean portStatisticsAvailable;
private boolean queueStatisticsAvailable;
- public DeviceStateImpl() {
+ DeviceStateImpl() {
}
@Override
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CancellationException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TransactionChainManager implements TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
+ private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
private final Object txLock = new Object();
- private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
private final DataBroker dataBroker;
+ private final String nodeId;
private LifecycleService lifecycleService;
@GuardedBy("txLock")
TransactionChainManager(@Nonnull final DataBroker dataBroker,
@Nonnull final DeviceInfo deviceInfo) {
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.nodeII = deviceInfo.getNodeInstanceIdentifier();
+ this.dataBroker = dataBroker;
+ this.nodeId = deviceInfo.getNodeInstanceIdentifier().getKey().getId().getValue();
this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
- lastSubmittedFuture = Futures.immediateFuture(null);
- LOG.debug("created txChainManager for {}", this.nodeII);
- }
-
- private NodeId nodeId() {
- return nodeII.getKey().getId();
+ this.lastSubmittedFuture = Futures.immediateFuture(null);
}
@GuardedBy("txLock")
private void createTxChain() {
- if (txChainFactory != null) {
- txChainFactory.close();
- }
+ Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
}
* transactions. Call this method for MASTER role only.
*/
void activateTransactionManager() {
- LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", this.nodeId, submitIsEnabled);
+ }
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
- LOG.debug("Transaction Factory create {}", nodeId());
Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
this.submitIsEnabled = false;
this.initCommit = true;
createTxChain();
- } else {
- LOG.debug("Transaction is active {}", nodeId());
}
}
}
* @return Future
*/
ListenableFuture<Void> deactivateTransactionManager() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
+ }
final ListenableFuture<Void> future;
synchronized (txLock) {
if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
- LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId());
transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
future = txChainShuttingDown();
Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
- LOG.debug("Transaction Factory deactivate for Node {}", nodeId());
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- txChainFactory.close();
- txChainFactory = null;
+ removeTxChainFactory();
}
@Override
public void onFailure(final Throwable t) {
- txChainFactory.close();
- txChainFactory = null;
+ removeTxChainFactory();
}
});
} else {
return future;
}
+ private void removeTxChainFactory() {
+ Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
+ txChainFactory = null;
+ }
+
boolean submitWriteTransaction() {
synchronized (txLock) {
if (!submitIsEnabled) {
- LOG.trace("transaction not committed - submit block issued");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("transaction not committed - submit block issued");
+ }
return false;
}
- if (wTx == null) {
- LOG.trace("nothing to commit - submit returns true");
+ if (Objects.isNull(wTx)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nothing to commit - submit returns true");
+ }
return true;
}
Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
- "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
+ "we have here Uncompleted Transaction for node {} and we are not MASTER", this.nodeId);
final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@Override
}
}
if (initCommit) {
- LOG.warn("Initial commit failed. ", t);
wTx = null;
- if (Objects.nonNull(lifecycleService)) {
- lifecycleService.closeConnection();
- }
+ Optional.ofNullable(lifecycleService).ifPresent(LifecycleService::closeConnection);
}
}
});
<T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
final InstanceIdentifier<T> path){
final WriteTransaction writeTx = getTransactionSafely();
- if (writeTx != null) {
- LOG.trace("addDeleteOperation called with path {} ", path);
+ if (Objects.nonNull(writeTx)) {
writeTx.delete(store, path);
} else {
- LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
- throw new TransactionChainClosedException("Cannot write into transaction.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
+ }
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
}
final T data,
final boolean createParents){
final WriteTransaction writeTx = getTransactionSafely();
- if (writeTx != null) {
- LOG.trace("writeToTransaction called with path {} ", path);
+ if (Objects.nonNull(writeTx)) {
writeTx.put(store, path, data, createParents);
} else {
- LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
- throw new TransactionChainClosedException("Cannot write into transaction.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
+ }
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
}
private WriteTransaction getTransactionSafely() {
synchronized (txLock) {
if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
- if (wTx == null && txChainFactory != null) {
- wTx = txChainFactory.newWriteOnlyTransaction();
- }
+ Optional.ofNullable(txChainFactory).ifPresent(bindingTransactionChain -> wTx = txChainFactory.newWriteOnlyTransaction());
}
}
return wTx;
}
ListenableFuture<Void> shuttingDown() {
- LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII.getKey().getId().getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
+ }
ListenableFuture<Void> future;
synchronized (txLock) {
this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
// hijack md-sal thread
future = lastSubmittedFuture;
} else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submitting all transactions for Node {}", this.nodeId);
+ }
// hijack md-sal thread
future = wTx.submit();
wTx = null;
@Override
public void close() {
- LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify"
- , nodeII);
- Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
- Preconditions.checkState(wTx == null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
+ }
synchronized (txLock) {
- if (txChainFactory != null) {
- txChainFactory.close();
- txChainFactory = null;
- }
+ removeTxChainFactory();
}
- Preconditions.checkState(txChainFactory == null);
}
private enum TransactionChainManagerStatus {
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.TimerTask;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
-import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
// Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
- private static final int MAX_CLEAN_DS_RETRIES = 3;
+ private static final int MAX_CLEAN_DS_RETRIES = 0;
private SalRoleService salRoleService = null;
private final HashedWheelTimer hashedWheelTimer;
private CONTEXT_STATE state;
private final RoleManager myManager;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+ private final LifecycleService lifecycleService;
RoleContextImpl(final DeviceInfo deviceInfo,
final HashedWheelTimer hashedWheelTimer,
- final RoleManager myManager) {
+ final RoleManager myManager,
+ final LifecycleService lifecycleService) {
this.deviceInfo = deviceInfo;
state = CONTEXT_STATE.WORKING;
this.myManager = myManager;
this.hashedWheelTimer = hashedWheelTimer;
+ this.lifecycleService = lifecycleService;
}
@Nullable
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
+ lifecycleService.closeConnection();
}
});
}
public void onFailure(final Throwable throwable) {
LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
-
+ myManager.removeDeviceFromOperationalDS(deviceInfo);
}
});
return future;
} else {
- return myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+ return myManager.removeDeviceFromOperationalDS(deviceInfo);
}
}
@VisibleForTesting
ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
- LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
- final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
- final Short version = deviceInfo.getVersion();
- if (null == version) {
- LOG.debug("Device version is null");
- return Futures.immediateFuture(null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
}
- if (version < OFConstants.OFP_VERSION_1_3) {
- LOG.debug("Device version not support ROLE");
- return Futures.immediateFuture(null);
- } else {
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+ if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
- .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()))).build();
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
final TimerTask timerTask = timeout -> {
if (!setRoleOutputFuture.isDone()) {
- LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, deviceInfo.getLOGValue());
+ LOG.warn("New role {} was not propagated to device {} during 5 sec", newRole, deviceInfo.getLOGValue());
setRoleOutputFuture.cancel(true);
}
};
- hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
+ hashedWheelTimer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
+ return Futures.immediateFuture(null);
}
return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
+ lifecycleService.closeConnection();
}
});
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
-import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
final DeviceContext deviceContext = Preconditions.checkNotNull(lifecycleService.getDeviceContext());
- final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this);
+ final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this, lifecycleService);
roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getLOGValue());
Futures.addCallback(roleContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
@Override
public void onFailure(Throwable throwable) {
LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
+ lifecycleService.closeConnection();
}
});
lifecycleService.setRoleContext(roleContext);
// got here because last known role is LEADER and DS might need clearing up
final RoleContext roleContext = iterator.next();
contexts.remove(roleContext.getDeviceInfo());
- removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+ removeDeviceFromOperationalDS(roleContext.getDeviceInfo());
}
}
}
@Override
- public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
+ public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
- delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
+ delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
Futures.addCallback(delFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
- contexts.remove(deviceInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
+ }
}
@Override
public void onFailure(@Nonnull final Throwable t) {
- // If we have any retries left, we will try to clean the datastore again
- if (numRetries > 0) {
- // We "used" one retry here, so decrement it
- final int curRetries = numRetries - 1;
- LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getLOGValue(), t, curRetries);
- // Recursive call to this method with "one less" retry
- removeDeviceFromOperationalDS(deviceInfo, curRetries);
- return;
- }
-
- // No retries left, so we will just close the role context, and ignore datastore cleanup
- LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getLOGValue(), t);
- contexts.remove(deviceInfo);
+ LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
}
});
package org.opendaylight.openflowplugin.impl.rpc.listener;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Identifiable;
public class ItemLifecycleListenerImpl implements ItemLifecycleListener {
private static final Logger LOG = LoggerFactory.getLogger(ItemLifecycleListenerImpl.class);
- public static final String NOT_ABLE_TO_WRITE_TO_TRANSACTION = "Not able to write to transaction: {}";
+ private static final String NOT_ABLE_TO_WRITE_TO_TRANSACTION = "Not able to write to transaction: ";
- private final DeviceContext deviceContext;
+ private final TxFacade txFacade;
- public ItemLifecycleListenerImpl(DeviceContext deviceContext) {
- this.deviceContext = deviceContext;
+ public ItemLifecycleListenerImpl(final TxFacade txFacade) {
+ this.txFacade = txFacade;
}
@Override
public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onAdded(KeyedInstanceIdentifier<I, K> itemPath, I itemBody) {
try {
- deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
- deviceContext.submitTransaction();
+ txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
+ txFacade.submitTransaction();
} catch (Exception e) {
LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
}
@Override
public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onRemoved(KeyedInstanceIdentifier<I, K> itemPath) {
try {
- deviceContext.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
- deviceContext.submitTransaction();
+ txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
+ txFacade.submitTransaction();
} catch (Exception e) {
LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
}
@Override
public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onUpdated(KeyedInstanceIdentifier<I, K> itemPath, I itemBody) {
try {
- deviceContext.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
- deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
- deviceContext.submitTransaction();
+ txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
+ txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
+ txFacade.submitTransaction();
} catch (Exception e) {
LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
}
final DeviceInfo deviceInfo) {
if (!statisticsContext.isSchedulingEnabled()) {
- LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue());
+ }
return;
}
- LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
+ }
timeCounter.markStart();
final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onFailure(@Nonnull final Throwable throwable) {
timeCounter.addTimeMark();
- LOG.warn("Statistics gathering for single node was not successful: {}", throwable);
+ LOG.warn("Statistics gathering for single node {} was not successful: ", deviceInfo.getLOGValue(), throwable.getMessage());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Gathering for node {} failure: ", deviceInfo.getLOGValue(), throwable);
+ }
calculateTimerDelay(timeCounter);
if (throwable instanceof CancellationException) {
/* This often happens when something wrong with akka or DS, so closing connection will help to restart device **/
contexts.get(deviceInfo).getLifecycleService().closeConnection();
} else {
- scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+ if (throwable instanceof IllegalStateException) {
+ stopScheduling(deviceInfo);
+ } else {
+ scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+ }
}
}
});
final DeviceInfo deviceInfo,
final StatisticsContext statisticsContext,
final TimeCounter timeCounter) {
- LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
+ }
if (!isStatisticsPollingEnabled) {
final Timeout pollTimeout = hashedWheelTimer.newTimeout(
timeout -> pollStatistics(
@Override
public void startScheduling(final DeviceInfo deviceInfo) {
if (isStatisticsPollingEnabled) {
- LOG.info("Statistics are shut down for device: {}", deviceInfo.getNodeId());
+ LOG.info("Statistics are shutdown for device: {}", deviceInfo.getNodeId());
return;
}
@Override
public void stopScheduling(final DeviceInfo deviceInfo) {
- LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
+ }
final StatisticsContext statisticsContext = contexts.get(deviceInfo);
if (statisticsContext == null) {
import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
-import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
private DeviceManager deviceManager;
@Mock
private ConvertorExecutor convertorExecutor;
- private LifecycleService lifecycleService;
@Mock
private MessageSpy messageSpy;
xid = new Xid(atomicLong.incrementAndGet());
xidMulti = new Xid(atomicLong.incrementAndGet());
+ ((DeviceContextImpl) deviceContext).lazyTransactionManagerInitialiaztion();
Mockito.doNothing().when(deviceContextSpy).writeToTransaction(Mockito.<LogicalDatastoreType>any(), Mockito.<InstanceIdentifier>any(), any());
+
}
@Test(expected = NullPointerException.class)
@Test
public void testAuxiliaryConnectionContext() {
final ConnectionContext mockedConnectionContext = addDummyAuxiliaryConnectionContext();
- final ConnectionContext pickedConnectiobContexts = deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE);
+ final ConnectionContext pickedConnectiobContexts = deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE);
assertEquals(mockedConnectionContext, pickedConnectiobContexts);
}
@Test
final ConnectionAdapter mockedAuxConnectionAdapter = mock(ConnectionAdapter.class);
when(mockedConnectionContext.getConnectionAdapter()).thenReturn(mockedAuxConnectionAdapter);
- assertNotNull(deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE));
+ assertNotNull(deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE));
deviceContext.removeAuxiliaryConnectionContext(mockedConnectionContext);
- assertNull(deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE));
+ assertNull(deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE));
}
private ConnectionContext addDummyAuxiliaryConnectionContext() {
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
@Before
public void setup() throws CandidateAlreadyRegisteredException {
- roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager);
+ roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager, lifecycleService);
roleContext.setSalRoleService(salRoleService);
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
Mockito.when(salRoleService.setRole(Mockito.<SetRoleInput>any())).thenReturn(Futures.immediateFuture(null));
+ Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
roleContextSpy = Mockito.spy((RoleContextImpl) roleContext);
}
@Test
public void startupClusterServices() throws Exception {
- Mockito.when(deviceInfo.getVersion()).thenReturn(null);
roleContextSpy.startupClusterServices();
Mockito.verify(roleContextSpy).sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
}
public void stopClusterServicesNotDisconnected() throws Exception {
roleContextSpy.stopClusterServices(false);
Mockito.verify(roleContextSpy).sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
- Mockito.verify(roleManager, Mockito.never()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any(), Mockito.anyInt());
+ Mockito.verify(roleManager, Mockito.never()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any());
}
@Test
public void stopClusterServicesDisconnected() throws Exception {
roleContextSpy.stopClusterServices(true);
- Mockito.verify(roleManager, Mockito.atLeastOnce()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any(), Mockito.anyInt());
+ Mockito.verify(roleManager, Mockito.atLeastOnce()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any());
}
@Test
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@RunWith(MockitoJUnitRunner.class)
public class RoleManagerImplTest {
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
Mockito.when(deviceInfo2.getNodeId()).thenReturn(nodeId2);
Mockito.when(deviceInfo.getDatapathId()).thenReturn(BigInteger.TEN);
+ Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
Mockito.when(lifecycleService.getDeviceContext()).thenReturn(deviceContext);
roleManager = new RoleManagerImpl(dataBroker, new HashedWheelTimer());
roleManager.setDeviceInitializationPhaseHandler(deviceInitializationPhaseHandler);
@Test
public void testCloseMaster() throws Exception {
roleManagerSpy.close();
- inOrder.verify(roleManagerSpy).removeDeviceFromOperationalDS(Mockito.eq(deviceInfo), Mockito.anyInt());
+ inOrder.verify(roleManagerSpy).removeDeviceFromOperationalDS(Mockito.eq(deviceInfo));
inOrder.verifyNoMoreInteractions();
}