Bug 6554 Fix rejecting connections 28/45228/1
authorJozef Bacigal <jozef.bacigal@pantheon.tech>
Mon, 22 Aug 2016 11:42:50 +0000 (13:42 +0200)
committerTomas Slusny <tomas.slusny@pantheon.sk>
Tue, 6 Sep 2016 13:52:05 +0000 (15:52 +0200)
- two way device context close removed
- txManager lambda improvements
- txManager logs improvements
- device context lazy initialization
- role context close improvement

Change-Id: I1a9f5890f267ada9a2978f9a92f792ab2411c1aa
Signed-off-by: Jozef Bacigal <jozef.bacigal@pantheon.tech>
(cherry picked from commit 5eb3934ab14beb9646f039e09f698dd1b118fb9b)

18 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/HandshakeContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceInfo.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/OutboundQueueProviderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceStateImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/listener/ItemLifecycleListenerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImplTest.java

index 2ffb510c3892b972ce042db4e3af1af5e1093ad1..aeff6660958b24b964a857335394ef12bcb3f7df 100644 (file)
@@ -24,4 +24,7 @@ public interface HandshakeContext extends AutoCloseable {
      * @return handshake pool
      */
     ThreadPoolExecutor getHandshakePool();
+
+    @Override
+    void close();
 }
index e204f8e9d88879eda8ef804b23bd81db4a088139..676c21c780ba112cc69264d4d2b192c0b30d58e1 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 
 /**
- * <p>
  * The central entity of OFP is the Device Context, which encapsulate the logical state of a switch
  * as seen by the controller. Each OpenFlow session is tracked by a Connection Context.
  * These attach to a particular Device Context in such a way, that there is at most one primary
@@ -36,7 +35,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
  * which is enforced by keeping a cap on the number of outstanding requests a particular Request
  * Context can have at any point in time. Should this quota be exceeded, any further attempt to make
  * a request to the switch will fail immediately, with proper error indication.
- * </p>
  */
 public interface DeviceContext extends
         OFPContext,
@@ -50,11 +48,6 @@ public interface DeviceContext extends
      */
     void shutdownConnection();
 
-    /**
-     * Initial submit transaction
-     */
-    void initialSubmitTransaction();
-
     /**
      * Method add auxiliary connection contexts to this context representing single device connection.
      * @param connectionContext new connection context
@@ -88,7 +81,7 @@ public interface DeviceContext extends
     /**
      * @return current devices auxiliary connection contexts
      */
-    ConnectionContext getAuxiliaryConnectiobContexts(BigInteger cookie);
+    ConnectionContext getAuxiliaryConnectionContexts(BigInteger cookie);
 
 
     /**
index ca94ad152c8621077cabf1ddd8e7a589f9d5e5a9..4af09607a3ff22a8d005d50a8e8d712ff81b3c9d 100644 (file)
@@ -33,7 +33,7 @@ public interface DeviceInfo extends XidSequencer {
     /**
      * @return version
      */
-    Short getVersion();
+    short getVersion();
 
     /**
      * @return datapathId
index 2b99fc99d6ebb19beaee2348f3b3263a55c582cc..9bc4c8837ab3a355d6dea27f37ec2dcf217c1621 100644 (file)
@@ -23,5 +23,6 @@ public interface RoleManager extends
         AutoCloseable,
         DeviceTerminationPhaseHandler {
 
-    CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries);
+    CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo);
+
 }
index 986bbb178e4fae8ecac206b4c1c4c5095601a1c9..e0ff00efe0424d2feb40d49fb8a5de559688e098 100644 (file)
@@ -108,31 +108,19 @@ public class ConnectionContextImpl implements ConnectionContext {
 
     @Override
     public void closeConnection(final boolean propagate) {
-        if (null == nodeId){
+        if (Objects.isNull(nodeId)){
             SessionStatistics.countEvent(connectionAdapter.getRemoteAddress().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);
         } else {
             SessionStatistics.countEvent(nodeId.toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP);
         }
-        final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO;
-        LOG.debug("Actively closing connection: {}, datapathId: {}",
-                connectionAdapter.getRemoteAddress(), datapathId);
-        connectionState = ConnectionContext.CONNECTION_STATE.RIP;
-
-        Future<Void> future = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                unregisterOutboundQueue();
-                return null;
-            }
-        });
-        try {
-            future.get(1, TimeUnit.SECONDS);
-            LOG.info("Unregister outbound queue successful.");
-        } catch (InterruptedException | TimeoutException | ExecutionException e) {
-            LOG.warn("Unregister outbound queue throws exception for node {} ", getSafeNodeIdForLOG());
-            LOG.trace("Unregister outbound queue throws exception for node {} ", getSafeNodeIdForLOG(), e);
+        final BigInteger datapathId = Objects.nonNull(featuresReply) ? featuresReply.getDatapathId() : BigInteger.ZERO;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Actively closing connection: {}, datapathId: {}",
+                    connectionAdapter.getRemoteAddress(), datapathId);
         }
+        connectionState = ConnectionContext.CONNECTION_STATE.RIP;
 
+        unregisterOutboundQueue();
         closeHandshakeContext();
 
         if (getConnectionAdapter().isAlive()) {
@@ -140,10 +128,7 @@ public class ConnectionContextImpl implements ConnectionContext {
         }
 
         if (propagate) {
-            LOG.debug("Propagating device disconnect for node {}", getSafeNodeIdForLOG());
             propagateDeviceDisconnectedEvent();
-        } else {
-            LOG.debug("Close connection without propagating for node {}", getSafeNodeIdForLOG());
         }
     }
 
@@ -190,10 +175,12 @@ public class ConnectionContextImpl implements ConnectionContext {
     }
 
     private void propagateDeviceDisconnectedEvent() {
-        if (null != deviceDisconnectedHandler) {
+        if (Objects.nonNull(deviceDisconnectedHandler)) {
             final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO;
-            LOG.debug("Propagating connection closed event: {}, datapathId:{}.",
-                    connectionAdapter.getRemoteAddress(), datapathId);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Propagating connection closed event: {}, datapathId:{}.",
+                        connectionAdapter.getRemoteAddress(), datapathId);
+            }
             deviceDisconnectedHandler.onDeviceDisconnected(this);
         }
     }
@@ -204,7 +191,7 @@ public class ConnectionContextImpl implements ConnectionContext {
      */
     @Override
     public String getSafeNodeIdForLOG() {
-        return null == nodeId ? "null" : nodeId.getValue();
+        return Objects.nonNull(nodeId) ? nodeId.getValue() : "null";
     }
 
     @Override
@@ -213,7 +200,9 @@ public class ConnectionContextImpl implements ConnectionContext {
     }
 
     private void unregisterOutboundQueue() {
-        LOG.debug("Trying unregister outbound queue handler registration for node {}", nodeId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Trying unregister outbound queue handler registration for node {}", nodeId);
+        }
         if (outboundQueueHandlerRegistration != null) {
             outboundQueueHandlerRegistration.close();
             outboundQueueHandlerRegistration = null;
@@ -323,7 +312,7 @@ public class ConnectionContextImpl implements ConnectionContext {
         }
 
         @Override
-        public Short getVersion() {
+        public short getVersion() {
             return version;
         }
 
index 49d0c2d463623a2e0d21ad8832ec7dcecb0205a0..8c03adc94bf8a8335c8fef96c9b51dc0dee3d88a 100644 (file)
@@ -18,9 +18,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 12.5.2015.
- */
 public class OutboundQueueProviderImpl implements OutboundQueueProvider {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueProviderImpl.class);
     private final short ofVersion;
index 82ad28a2b15d547d103f744de9ee1706117bb03b..e12549da933dec9045318a9ede85383eaa0ee52f 100644 (file)
@@ -50,8 +50,10 @@ public class HandshakeListenerImpl implements HandshakeListener {
 
     @Override
     public void onHandshakeSuccessful(final GetFeaturesOutput featureOutput, final Short version) {
-        LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
-        closeHandshakeContext();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("handshake succeeded: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
+        }
+        this.handshakeContext.close();
         connectionContext.changeStateToWorking();
         connectionContext.setFeatures(featureOutput);
         connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId()));
@@ -66,7 +68,9 @@ public class HandshakeListenerImpl implements HandshakeListener {
         return new FutureCallback<RpcResult<BarrierOutput>>() {
             @Override
             public void onSuccess(@Nullable final RpcResult<BarrierOutput> result) {
-                LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getNodeId().getValue());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("succeeded by getting sweep barrier after post-handshake for device {}", connectionContext.getNodeId().getValue());
+                }
                 try {
                     ConnectionStatus connectionStatusResult = deviceConnectedHandler.deviceConnected(connectionContext);
                     if (!ConnectionStatus.MAY_CONTINUE.equals(connectionStatusResult)) {
@@ -90,29 +94,22 @@ public class HandshakeListenerImpl implements HandshakeListener {
         };
     }
 
-    protected ListenableFuture<RpcResult<BarrierOutput>> fireBarrier(final Short version, final long xid) {
+    private ListenableFuture<RpcResult<BarrierOutput>> fireBarrier(final Short version, final long xid) {
         final BarrierInput barrierInput = new BarrierInputBuilder()
                 .setXid(xid)
                 .setVersion(version)
                 .build();
         return JdkFutureAdapters.listenInPoolThread(
-                connectionContext.getConnectionAdapter().barrier(barrierInput));
+                this.connectionContext.getConnectionAdapter().barrier(barrierInput));
     }
 
     @Override
     public void onHandshakeFailure() {
-        LOG.debug("handshake failed: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
-        closeHandshakeContext();
-        connectionContext.closeConnection(false);
-    }
-
-    private void closeHandshakeContext() {
-        try {
-            handshakeContext.close();
-        } catch (final Exception e) {
-            LOG.error("Closing handshake context failed: {}", e.getMessage());
-            LOG.debug("Detail in handshake context close: {}", e);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("handshake failed: {}", this.connectionContext.getConnectionAdapter().getRemoteAddress());
         }
+        this.handshakeContext.close();
+        this.connectionContext.closeConnection(false);
     }
 
     @Override
index 6212bf03027e47f3c17a64e181a38a53aad6ca3c..bf9b2b548a97c156be1197270323d1640adc63de 100644 (file)
@@ -18,6 +18,7 @@ import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -112,15 +113,16 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private static final float LOW_WATERMARK_FACTOR = 0.75f;
     // TODO: high water mark factor should be parametrized
     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
+    private boolean initialized;
 
     private ConnectionContext primaryConnectionContext;
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
-    private final TransactionChainManager transactionChainManager;
-    private final DeviceFlowRegistry deviceFlowRegistry;
-    private final DeviceGroupRegistry deviceGroupRegistry;
-    private final DeviceMeterRegistry deviceMeterRegistry;
+    private TransactionChainManager transactionChainManager;
+    private DeviceFlowRegistry deviceFlowRegistry;
+    private DeviceGroupRegistry deviceGroupRegistry;
+    private DeviceMeterRegistry deviceMeterRegistry;
     private final PacketInRateLimiter packetInLimiter;
     private final MessageSpy messageSpy;
     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
@@ -140,7 +142,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private volatile CONTEXT_STATE state;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
 
-    public DeviceContextImpl(
+    DeviceContextImpl(
             @Nonnull final ConnectionContext primaryConnectionContext,
             @Nonnull final DataBroker dataBroker,
             @Nonnull final MessageSpy messageSpy,
@@ -148,41 +150,34 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             @Nonnull final DeviceManager manager,
             final ConvertorExecutor convertorExecutor,
             final boolean skipTableFeatures) {
-        this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
+        this.primaryConnectionContext = primaryConnectionContext;
         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
         this.deviceState = new DeviceStateImpl();
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
-        auxiliaryConnectionContexts = new HashMap<>();
-        deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
-        deviceGroupRegistry = new DeviceGroupRegistryImpl();
-        deviceMeterRegistry = new DeviceMeterRegistryImpl();
+        this.dataBroker = dataBroker;
+        this.auxiliaryConnectionContexts = new HashMap<>();
         this.messageSpy = Preconditions.checkNotNull(messageSpy);
-        this.deviceManager = Preconditions.checkNotNull(manager);
+        this.deviceManager = manager;
 
-        packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+        this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
                 /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
 
         this.translatorLibrary = translatorLibrary;
-        portStatusTranslator = translatorLibrary.lookupTranslator(
+        this.portStatusTranslator = translatorLibrary.lookupTranslator(
                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
-        packetInTranslator = translatorLibrary.lookupTranslator(
+        this.packetInTranslator = translatorLibrary.lookupTranslator(
                 new TranslatorKey(deviceInfo.getVersion(), PacketIn.class.getName()));
-        flowRemovedTranslator = translatorLibrary.lookupTranslator(
+        this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
 
-        itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
-        flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
-        itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
+        this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
+        this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
+        this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
         this.state = CONTEXT_STATE.INITIALIZATION;
         this.convertorExecutor = convertorExecutor;
         this.skipTableFeatures = skipTableFeatures;
+        this.initialized = false;
     }
 
-    /**
-     * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
-     * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
-     */
     @Override
     public void initialSubmitTransaction() {
         transactionChainManager.initialSubmitWriteTransaction();
@@ -220,24 +215,30 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
                                                           final InstanceIdentifier<T> path,
                                                           final T data){
-        transactionChainManager.writeToTransaction(store, path, data, false);
+        if (Objects.nonNull(transactionChainManager)) {
+            transactionChainManager.writeToTransaction(store, path, data, false);
+        }
     }
 
     @Override
     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
                                                                          final InstanceIdentifier<T> path,
                                                                          final T data){
-        transactionChainManager.writeToTransaction(store, path, data, true);
+        if (Objects.nonNull(transactionChainManager)) {
+            transactionChainManager.writeToTransaction(store, path, data, true);
+        }
     }
 
     @Override
     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) throws TransactionChainClosedException {
-        transactionChainManager.addDeleteOperationTotTxChain(store, path);
+        if (Objects.nonNull(transactionChainManager)) {
+            transactionChainManager.addDeleteOperationTotTxChain(store, path);
+        }
     }
 
     @Override
     public boolean submitTransaction() {
-        return transactionChainManager.submitWriteTransaction();
+        return Objects.nonNull(transactionChainManager) && transactionChainManager.submitWriteTransaction();
     }
 
     @Override
@@ -246,7 +247,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
+    public ConnectionContext getAuxiliaryConnectionContexts(final BigInteger cookie) {
         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
     }
 
@@ -492,12 +493,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public synchronized void shutdownConnection() {
-        LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
+        }
         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
             LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
             return;
         }
-        setState(CONTEXT_STATE.TERMINATION);
 
         if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
             LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue());
@@ -518,7 +520,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
-        return transactionChainManager.shuttingDown();
+        ListenableFuture<Void> future = Futures.immediateFuture(null);
+        if (Objects.nonNull(this.transactionChainManager)) {
+            future = this.transactionChainManager.shuttingDown();
+        }
+        return future;
     }
 
     @VisibleForTesting
@@ -543,7 +549,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
-        return this.transactionChainManager.deactivateTransactionManager();
+        ListenableFuture<Void> future = Futures.immediateFuture(null);
+        if (Objects.nonNull(this.transactionChainManager)) {
+            future = this.transactionChainManager.deactivateTransactionManager();
+        }
+        return future;
     }
 
     @Override
@@ -558,7 +568,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
-        this.transactionChainManager.setLifecycleService(lifecycleService);
+        if (Objects.nonNull(this.transactionChainManager)) {
+            this.transactionChainManager.setLifecycleService(lifecycleService);
+        }
     }
 
     @Override
@@ -589,6 +601,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
 
+        lazyTransactionManagerInitialiaztion();
+
         this.transactionChainManager.activateTransactionManager();
 
         try {
@@ -600,4 +614,18 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
         return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
     }
+
+    @VisibleForTesting
+    void lazyTransactionManagerInitialiaztion() {
+        if (!this.initialized) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
+            }
+            this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
+            this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
+            this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
+            this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+            this.initialized = true;
+        }
+    }
 }
index 31795d68e1abe9f158217e9aedf9b14a849e625b..f3b14296e7495a33d5d977e6d42ffc2f4295deb2 100644 (file)
@@ -18,6 +18,7 @@ import io.netty.util.TimerTask;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -102,16 +103,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                              final ConvertorExecutor convertorExecutor,
                              final boolean skipTableFeatures) {
 
-        this.switchFeaturesMandatory = switchFeaturesMandatory;
-        this.globalNotificationQuota = globalNotificationQuota;
-        this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
-        this.skipTableFeatures = skipTableFeatures;
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.convertorExecutor = convertorExecutor;
-        this.hashedWheelTimer = hashedWheelTimer;
+        this.dataBroker = dataBroker;
+
         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-
         final NodesBuilder nodesBuilder = new NodesBuilder();
         nodesBuilder.setNode(Collections.<Node>emptyList());
         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
@@ -122,10 +117,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             throw new IllegalStateException(e);
         }
 
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
+        this.globalNotificationQuota = globalNotificationQuota;
+        this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+        this.skipTableFeatures = skipTableFeatures;
+        this.convertorExecutor = convertorExecutor;
+        this.hashedWheelTimer = hashedWheelTimer;
         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
         this.barrierCountLimit = barrierCountLimit;
-
-        spyPool = new ScheduledThreadPoolExecutor(1);
+        this.spyPool = new ScheduledThreadPoolExecutor(1);
         this.singletonServiceProvider = singletonServiceProvider;
         this.notificationPublishService = notificationPublishService;
         this.messageSpy = messageSpy;
@@ -158,7 +158,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
          */
          if (deviceContexts.containsKey(deviceInfo)) {
              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
-             LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo);
+             LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
                  LOG.warn("Node {} context state not in TERMINATION state.",
                          connectionContext.getDeviceInfo().getLOGValue());
@@ -195,13 +195,13 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                 convertorExecutor,
                 skipTableFeatures);
 
-        deviceContexts.putIfAbsent(deviceInfo, deviceContext);
+        deviceContexts.put(deviceInfo, deviceContext);
 
         final LifecycleService lifecycleService = new LifecycleServiceImpl();
         lifecycleService.setDeviceContext(deviceContext);
         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
 
-        lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
+        lifecycleServices.put(deviceInfo, lifecycleService);
 
         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
 
@@ -226,7 +226,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                 if (freshNotificationLimit < 100) {
                     freshNotificationLimit = 100;
                 }
-                LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+                }
                 for (final DeviceContext deviceContext : deviceContexts.values()) {
                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
                 }
@@ -253,10 +255,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             deviceCtx.shuttingDownDataStoreTransactions();
         }
 
-        if (spyPool != null) {
-            spyPool.shutdownNow();
-            spyPool = null;
-        }
+        Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
+        spyPool = null;
+
     }
 
     @Override
@@ -316,7 +317,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         }
 
         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
-            LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
+            LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
             return;
         }
 
@@ -328,33 +329,31 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
         }
         //TODO: Auxiliary connections supported ?
-        {
             /* Device is disconnected and so we need to close TxManager */
-            final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
-            Futures.addCallback(future, new FutureCallback<Void>() {
+        final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
+        Futures.addCallback(future, new FutureCallback<Void>() {
 
-                @Override
-                public void onSuccess(final Void result) {
-                    LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
-                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
-                }
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
+                deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+            }
 
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
-                    LOG.trace("TxChainManager failed by closing. ", t);
-                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
-                }
-            });
-            /* Add timer for Close TxManager because it could fain ind cluster without notification */
-            final TimerTask timerTask = timeout -> {
-                if (!future.isDone()) {
-                    LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
-                    future.cancel(false);
-                }
-            };
-            hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
-        }
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
+                LOG.trace("TxChainManager failed by closing. ", t);
+                deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+            }
+        });
+        /* Add timer for Close TxManager because it could fain ind cluster without notification */
+        final TimerTask timerTask = timeout -> {
+            if (!future.isDone()) {
+                LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
+                future.cancel(false);
+            }
+        };
+        hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
     }
 
     @VisibleForTesting
index e15fc936b80fd339f4a257317a40d7cc607898aa..d7ec195a11c68df293419b480fe899bb0083441e 100644 (file)
@@ -13,12 +13,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 
 /**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- * <p/>
- * DeviceState is builded from {@link FeaturesReply} and {@link NodeId}. Both values are inside
- * {@link org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext}
- *
+ * Holder for device features
  */
 class DeviceStateImpl implements DeviceState {
 
@@ -29,7 +24,7 @@ class DeviceStateImpl implements DeviceState {
     private boolean portStatisticsAvailable;
     private boolean queueStatisticsAvailable;
 
-    public DeviceStateImpl() {
+    DeviceStateImpl() {
     }
 
     @Override
index dcb7f70bbb8ca2237745298d22667f81cc0de3a2..0912b37ec75a4d2fb29be16bf394800b6fa51ced 100644 (file)
@@ -14,8 +14,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -31,12 +31,8 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,10 +48,11 @@ import org.slf4j.LoggerFactory;
 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
+    private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
 
     private final Object txLock = new Object();
-    private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
     private final DataBroker dataBroker;
+    private final String nodeId;
     private LifecycleService lifecycleService;
 
     @GuardedBy("txLock")
@@ -74,22 +71,15 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
 
     TransactionChainManager(@Nonnull final DataBroker dataBroker,
                             @Nonnull final DeviceInfo deviceInfo) {
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.nodeII = deviceInfo.getNodeInstanceIdentifier();
+        this.dataBroker = dataBroker;
+        this.nodeId = deviceInfo.getNodeInstanceIdentifier().getKey().getId().getValue();
         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-        lastSubmittedFuture = Futures.immediateFuture(null);
-        LOG.debug("created txChainManager for {}", this.nodeII);
-    }
-
-    private NodeId nodeId() {
-        return nodeII.getKey().getId();
+        this.lastSubmittedFuture = Futures.immediateFuture(null);
     }
 
     @GuardedBy("txLock")
     private void createTxChain() {
-        if (txChainFactory != null) {
-            txChainFactory.close();
-        }
+        Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
     }
 
@@ -108,18 +98,17 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * transactions. Call this method for MASTER role only.
      */
     void activateTransactionManager() {
-        LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", this.nodeId, submitIsEnabled);
+        }
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
-                LOG.debug("Transaction Factory create {}", nodeId());
                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
                 this.submitIsEnabled = false;
                 this.initCommit = true;
                 createTxChain();
-            } else {
-                LOG.debug("Transaction is active {}", nodeId());
             }
         }
     }
@@ -131,25 +120,24 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * @return Future
      */
     ListenableFuture<Void> deactivateTransactionManager() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
+        }
         final ListenableFuture<Void> future;
         synchronized (txLock) {
             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
-                LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId());
                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
                 future = txChainShuttingDown();
                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
-                LOG.debug("Transaction Factory deactivate for Node {}", nodeId());
                 Futures.addCallback(future, new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
-                        txChainFactory.close();
-                        txChainFactory = null;
+                        removeTxChainFactory();
                     }
 
                     @Override
                     public void onFailure(final Throwable t) {
-                        txChainFactory.close();
-                        txChainFactory = null;
+                        removeTxChainFactory();
                     }
                 });
             } else {
@@ -160,18 +148,27 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return future;
     }
 
+    private void removeTxChainFactory() {
+        Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
+        txChainFactory = null;
+    }
+
     boolean submitWriteTransaction() {
         synchronized (txLock) {
             if (!submitIsEnabled) {
-                LOG.trace("transaction not committed - submit block issued");
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("transaction not committed - submit block issued");
+                }
                 return false;
             }
-            if (wTx == null) {
-                LOG.trace("nothing to commit - submit returns true");
+            if (Objects.isNull(wTx)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("nothing to commit - submit returns true");
+                }
                 return true;
             }
             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
-                    "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
+                    "we have here Uncompleted Transaction for node {} and we are not MASTER", this.nodeId);
             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
                 @Override
@@ -194,11 +191,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
                         }
                     }
                     if (initCommit) {
-                        LOG.warn("Initial commit failed. ", t);
                         wTx = null;
-                        if (Objects.nonNull(lifecycleService)) {
-                            lifecycleService.closeConnection();
-                        }
+                        Optional.ofNullable(lifecycleService).ifPresent(LifecycleService::closeConnection);
                     }
                 }
             });
@@ -211,12 +205,13 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
                                                              final InstanceIdentifier<T> path){
         final WriteTransaction writeTx = getTransactionSafely();
-        if (writeTx != null) {
-            LOG.trace("addDeleteOperation called with path {} ", path);
+        if (Objects.nonNull(writeTx)) {
             writeTx.delete(store, path);
         } else {
-            LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
-            throw new TransactionChainClosedException("Cannot write into transaction.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
+            }
+            throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
         }
     }
 
@@ -225,12 +220,13 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
                                                    final T data,
                                                    final boolean createParents){
         final WriteTransaction writeTx = getTransactionSafely();
-        if (writeTx != null) {
-            LOG.trace("writeToTransaction called with path {} ", path);
+        if (Objects.nonNull(writeTx)) {
             writeTx.put(store, path, data, createParents);
         } else {
-            LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
-            throw new TransactionChainClosedException("Cannot write into transaction.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
+            }
+            throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
         }
     }
 
@@ -259,9 +255,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     private WriteTransaction getTransactionSafely() {
             synchronized (txLock) {
                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
-                    if (wTx == null && txChainFactory != null) {
-                        wTx = txChainFactory.newWriteOnlyTransaction();
-                    }
+                    Optional.ofNullable(txChainFactory).ifPresent(bindingTransactionChain -> wTx = txChainFactory.newWriteOnlyTransaction());
                 }
             }
         return wTx;
@@ -276,7 +270,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     }
 
     ListenableFuture<Void> shuttingDown() {
-        LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII.getKey().getId().getValue());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
+        }
         ListenableFuture<Void> future;
         synchronized (txLock) {
             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
@@ -296,6 +292,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             // hijack md-sal thread
             future = lastSubmittedFuture;
         } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submitting all transactions for Node {}", this.nodeId);
+            }
             // hijack md-sal thread
             future = wTx.submit();
             wTx = null;
@@ -305,17 +304,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
 
     @Override
     public void close() {
-        LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify"
-                , nodeII);
-        Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
-        Preconditions.checkState(wTx == null);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
+        }
         synchronized (txLock) {
-            if (txChainFactory != null) {
-                txChainFactory.close();
-                txChainFactory = null;
-            }
+            removeTxChainFactory();
         }
-        Preconditions.checkState(txChainFactory == null);
     }
 
     private enum TransactionChainManagerStatus {
index 0a1f81095baed3b82ea9f2da1522271bd1a2472c..0af2547c2144b9431dff4dd5d1db59396a0e9d6b 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.TimerTask;
+
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -27,10 +28,10 @@ import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
-import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
@@ -48,7 +49,7 @@ class RoleContextImpl implements RoleContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
     // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
-    private static final int MAX_CLEAN_DS_RETRIES = 3;
+    private static final int MAX_CLEAN_DS_RETRIES = 0;
 
     private SalRoleService salRoleService = null;
     private final HashedWheelTimer hashedWheelTimer;
@@ -56,14 +57,17 @@ class RoleContextImpl implements RoleContext {
     private CONTEXT_STATE state;
     private final RoleManager myManager;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+    private final LifecycleService lifecycleService;
 
     RoleContextImpl(final DeviceInfo deviceInfo,
                     final HashedWheelTimer hashedWheelTimer,
-                    final RoleManager myManager) {
+                    final RoleManager myManager,
+                    final LifecycleService lifecycleService) {
         this.deviceInfo = deviceInfo;
         state = CONTEXT_STATE.WORKING;
         this.myManager = myManager;
         this.hashedWheelTimer = hashedWheelTimer;
+        this.lifecycleService = lifecycleService;
     }
 
     @Nullable
@@ -114,6 +118,7 @@ class RoleContextImpl implements RoleContext {
             @Override
             public void onFailure(final Throwable throwable) {
                 LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
+                lifecycleService.closeConnection();
             }
         });
     }
@@ -142,13 +147,12 @@ class RoleContextImpl implements RoleContext {
                 public void onFailure(final Throwable throwable) {
                     LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
                     LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
-                    myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
-
+                    myManager.removeDeviceFromOperationalDS(deviceInfo);
                 }
             });
             return future;
         } else {
-            return myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+            return myManager.removeDeviceFromOperationalDS(deviceInfo);
         }
     }
 
@@ -159,27 +163,24 @@ class RoleContextImpl implements RoleContext {
 
     @VisibleForTesting
     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
-        LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
-        final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
-        final Short version = deviceInfo.getVersion();
-        if (null == version) {
-            LOG.debug("Device version is null");
-            return Futures.immediateFuture(null);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
         }
-        if (version < OFConstants.OFP_VERSION_1_3) {
-            LOG.debug("Device version not support ROLE");
-            return Futures.immediateFuture(null);
-        } else {
+        final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+        if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
-                    .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()))).build();
+                    .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
             setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
             final TimerTask timerTask = timeout -> {
                 if (!setRoleOutputFuture.isDone()) {
-                    LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, deviceInfo.getLOGValue());
+                    LOG.warn("New role {} was not propagated to device {} during 5 sec", newRole, deviceInfo.getLOGValue());
                     setRoleOutputFuture.cancel(true);
                 }
             };
-            hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
+            hashedWheelTimer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
+        } else {
+            LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
+            return Futures.immediateFuture(null);
         }
         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
     }
@@ -208,6 +209,7 @@ class RoleContextImpl implements RoleContext {
             @Override
             public void onFailure(final Throwable throwable) {
                 LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
+                lifecycleService.closeConnection();
             }
         });
 
index 4a1ca41033c3d4d55fc66e9395572d7801d5c475..28191b5d63bc40951ffcfc79fe27b9f246ba57cb 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
-import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
@@ -70,7 +69,7 @@ public class RoleManagerImpl implements RoleManager {
     @Override
     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
         final DeviceContext deviceContext = Preconditions.checkNotNull(lifecycleService.getDeviceContext());
-        final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this);
+        final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this, lifecycleService);
         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
         Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getLOGValue());
         Futures.addCallback(roleContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
@@ -84,6 +83,7 @@ public class RoleManagerImpl implements RoleManager {
                     @Override
                     public void onFailure(Throwable throwable) {
                         LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
+                        lifecycleService.closeConnection();
                     }
                 });
         lifecycleService.setRoleContext(roleContext);
@@ -97,7 +97,7 @@ public class RoleManagerImpl implements RoleManager {
             // got here because last known role is LEADER and DS might need clearing up
             final RoleContext roleContext = iterator.next();
             contexts.remove(roleContext.getDeviceInfo());
-            removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
+            removeDeviceFromOperationalDS(roleContext.getDeviceInfo());
         }
     }
 
@@ -108,33 +108,22 @@ public class RoleManagerImpl implements RoleManager {
     }
 
     @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
+    public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
-        delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
+        delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
 
         Futures.addCallback(delFuture, new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
-                contexts.remove(deviceInfo);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
+                }
             }
 
             @Override
             public void onFailure(@Nonnull final Throwable t) {
-                // If we have any retries left, we will try to clean the datastore again
-                if (numRetries > 0) {
-                    // We "used" one retry here, so decrement it
-                    final int curRetries = numRetries - 1;
-                    LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getLOGValue(), t, curRetries);
-                    // Recursive call to this method with "one less" retry
-                    removeDeviceFromOperationalDS(deviceInfo, curRetries);
-                    return;
-                }
-
-                // No retries left, so we will just close the role context, and ignore datastore cleanup
-                LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getLOGValue(), t);
-                contexts.remove(deviceInfo);
+                LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
             }
         });
 
index c4ab0e3b2c967efe66fc20235146167798d10b53..773be3c3d2d5359f11460e43afc4ae56df926af7 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.openflowplugin.impl.rpc.listener;
 
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Identifiable;
@@ -24,19 +24,19 @@ import org.slf4j.LoggerFactory;
 public class ItemLifecycleListenerImpl implements ItemLifecycleListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(ItemLifecycleListenerImpl.class);
-    public static final String NOT_ABLE_TO_WRITE_TO_TRANSACTION = "Not able to write to transaction: {}";
+    private static final String NOT_ABLE_TO_WRITE_TO_TRANSACTION = "Not able to write to transaction: ";
 
-    private final DeviceContext deviceContext;
+    private final TxFacade txFacade;
 
-    public ItemLifecycleListenerImpl(DeviceContext deviceContext) {
-        this.deviceContext = deviceContext;
+    public ItemLifecycleListenerImpl(final TxFacade txFacade) {
+        this.txFacade = txFacade;
     }
 
     @Override
     public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onAdded(KeyedInstanceIdentifier<I, K> itemPath, I itemBody) {
         try {
-            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
-            deviceContext.submitTransaction();
+            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
+            txFacade.submitTransaction();
         } catch (Exception e) {
             LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
         }
@@ -45,8 +45,8 @@ public class ItemLifecycleListenerImpl implements ItemLifecycleListener {
     @Override
     public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onRemoved(KeyedInstanceIdentifier<I, K> itemPath) {
         try {
-            deviceContext.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
-            deviceContext.submitTransaction();
+            txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
+            txFacade.submitTransaction();
         } catch (Exception e) {
             LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
         }
@@ -55,9 +55,9 @@ public class ItemLifecycleListenerImpl implements ItemLifecycleListener {
     @Override
     public <I extends Identifiable<K> & DataObject, K extends Identifier<I>> void onUpdated(KeyedInstanceIdentifier<I, K> itemPath, I itemBody) {
         try {
-            deviceContext.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
-            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
-            deviceContext.submitTransaction();
+            txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, itemPath);
+            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, itemPath, itemBody);
+            txFacade.submitTransaction();
         } catch (Exception e) {
             LOG.warn(NOT_ABLE_TO_WRITE_TO_TRANSACTION, e);
         }
index 8ebd6cd614085425a630792bfe8fc0686d476361..01cb6d0b14a94a98fcc0531b2712ef4cb21cbd4b 100644 (file)
@@ -107,11 +107,15 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                         final DeviceInfo deviceInfo) {
 
         if (!statisticsContext.isSchedulingEnabled()) {
-            LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue());
+            }
             return;
         }
 
-        LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
+        }
         timeCounter.markStart();
         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@@ -125,14 +129,20 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
                 timeCounter.addTimeMark();
-                LOG.warn("Statistics gathering for single node was not successful: {}", throwable.getMessage());
-                LOG.trace("Statistics gathering for single node was not successful.. ", throwable);
+                LOG.warn("Statistics gathering for single node {} was not successful: ", deviceInfo.getLOGValue(), throwable.getMessage());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Gathering for node {} failure: ", deviceInfo.getLOGValue(), throwable);
+                }
                 calculateTimerDelay(timeCounter);
                 if (throwable instanceof CancellationException) {
                     /* This often happens when something wrong with akka or DS, so closing connection will help to restart device **/
                     contexts.get(deviceInfo).getLifecycleService().closeConnection();
                 } else {
-                    scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+                    if (throwable instanceof IllegalStateException) {
+                        stopScheduling(deviceInfo);
+                    } else {
+                        scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+                    }
                 }
             }
         });
@@ -153,7 +163,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                                      final DeviceInfo deviceInfo,
                                      final StatisticsContext statisticsContext,
                                      final TimeCounter timeCounter) {
-        LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
+        }
         if (!isStatisticsPollingEnabled) {
             final Timeout pollTimeout = hashedWheelTimer.newTimeout(
                     timeout -> pollStatistics(
@@ -254,7 +266,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
     @Override
     public void startScheduling(final DeviceInfo deviceInfo) {
         if (isStatisticsPollingEnabled) {
-            LOG.info("Statistics are shut down for device: {}", deviceInfo.getNodeId());
+            LOG.info("Statistics are shutdown for device: {}", deviceInfo.getNodeId());
             return;
         }
 
@@ -279,7 +291,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
 
     @Override
     public void stopScheduling(final DeviceInfo deviceInfo) {
-        LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
+        }
         final StatisticsContext statisticsContext = contexts.get(deviceInfo);
 
         if (statisticsContext == null) {
index ba1f31b0ed9887000853216c2be637b6c517bdff..3606665575f1219908390ac4250f4bf70ed2ff0b 100644 (file)
@@ -61,7 +61,6 @@ import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
@@ -70,7 +69,6 @@ import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRe
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
-import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
@@ -176,7 +174,6 @@ public class DeviceContextImplTest {
     private DeviceManager deviceManager;
     @Mock
     private ConvertorExecutor convertorExecutor;
-    private LifecycleService lifecycleService;
     @Mock
     private MessageSpy messageSpy;
 
@@ -240,8 +237,10 @@ public class DeviceContextImplTest {
 
         xid = new Xid(atomicLong.incrementAndGet());
         xidMulti = new Xid(atomicLong.incrementAndGet());
+        ((DeviceContextImpl) deviceContext).lazyTransactionManagerInitialiaztion();
 
         Mockito.doNothing().when(deviceContextSpy).writeToTransaction(Mockito.<LogicalDatastoreType>any(), Mockito.<InstanceIdentifier>any(), any());
+
     }
 
     @Test(expected = NullPointerException.class)
@@ -278,7 +277,7 @@ public class DeviceContextImplTest {
     @Test
     public void testAuxiliaryConnectionContext() {
         final ConnectionContext mockedConnectionContext = addDummyAuxiliaryConnectionContext();
-        final ConnectionContext pickedConnectiobContexts = deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE);
+        final ConnectionContext pickedConnectiobContexts = deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE);
         assertEquals(mockedConnectionContext, pickedConnectiobContexts);
     }
     @Test
@@ -288,9 +287,9 @@ public class DeviceContextImplTest {
         final ConnectionAdapter mockedAuxConnectionAdapter = mock(ConnectionAdapter.class);
         when(mockedConnectionContext.getConnectionAdapter()).thenReturn(mockedAuxConnectionAdapter);
 
-        assertNotNull(deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE));
+        assertNotNull(deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE));
         deviceContext.removeAuxiliaryConnectionContext(mockedConnectionContext);
-        assertNull(deviceContext.getAuxiliaryConnectiobContexts(DUMMY_COOKIE));
+        assertNull(deviceContext.getAuxiliaryConnectionContexts(DUMMY_COOKIE));
     }
 
     private ConnectionContext addDummyAuxiliaryConnectionContext() {
index 75a233ed4f4ad65173a8212d9d006e80afe42915..4234aa992a005ad44e3934312d2cb73b766eec7c 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
@@ -48,10 +49,11 @@ public class RoleContextImplTest {
 
     @Before
     public void setup() throws CandidateAlreadyRegisteredException {
-        roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager);
+        roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager, lifecycleService);
         roleContext.setSalRoleService(salRoleService);
         Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
         Mockito.when(salRoleService.setRole(Mockito.<SetRoleInput>any())).thenReturn(Futures.immediateFuture(null));
+        Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
         roleContextSpy = Mockito.spy((RoleContextImpl) roleContext);
     }
 
@@ -73,7 +75,6 @@ public class RoleContextImplTest {
 
     @Test
     public void startupClusterServices() throws Exception {
-        Mockito.when(deviceInfo.getVersion()).thenReturn(null);
         roleContextSpy.startupClusterServices();
         Mockito.verify(roleContextSpy).sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
     }
@@ -96,13 +97,13 @@ public class RoleContextImplTest {
     public void stopClusterServicesNotDisconnected() throws Exception {
         roleContextSpy.stopClusterServices(false);
         Mockito.verify(roleContextSpy).sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
-        Mockito.verify(roleManager, Mockito.never()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any(), Mockito.anyInt());
+        Mockito.verify(roleManager, Mockito.never()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any());
     }
 
     @Test
     public void stopClusterServicesDisconnected() throws Exception {
         roleContextSpy.stopClusterServices(true);
-        Mockito.verify(roleManager, Mockito.atLeastOnce()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any(), Mockito.anyInt());
+        Mockito.verify(roleManager, Mockito.atLeastOnce()).removeDeviceFromOperationalDS(Mockito.<DeviceInfo>any());
     }
 
     @Test
index adec48770bcec9218613cab03b0adc2bb5f598c6..c1e29b75cc4489a2bf12649f0493e358e56814d3 100644 (file)
@@ -39,9 +39,13 @@ import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTermin
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
 @RunWith(MockitoJUnitRunner.class)
 public class RoleManagerImplTest {
@@ -106,6 +110,7 @@ public class RoleManagerImplTest {
         Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
         Mockito.when(deviceInfo2.getNodeId()).thenReturn(nodeId2);
         Mockito.when(deviceInfo.getDatapathId()).thenReturn(BigInteger.TEN);
+        Mockito.when(deviceInfo.getNodeInstanceIdentifier()).thenReturn(DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
         Mockito.when(lifecycleService.getDeviceContext()).thenReturn(deviceContext);
         roleManager = new RoleManagerImpl(dataBroker, new HashedWheelTimer());
         roleManager.setDeviceInitializationPhaseHandler(deviceInitializationPhaseHandler);
@@ -132,7 +137,7 @@ public class RoleManagerImplTest {
     @Test
     public void testCloseMaster() throws Exception {
         roleManagerSpy.close();
-        inOrder.verify(roleManagerSpy).removeDeviceFromOperationalDS(Mockito.eq(deviceInfo), Mockito.anyInt());
+        inOrder.verify(roleManagerSpy).removeDeviceFromOperationalDS(Mockito.eq(deviceInfo));
         inOrder.verifyNoMoreInteractions();
     }