Merge "Bug-4957 Cluster Role change fix"
authormichal rehak <mirehak@cisco.com>
Wed, 2 Mar 2016 16:46:53 +0000 (16:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 2 Mar 2016 16:46:53 +0000 (16:46 +0000)
23 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceState.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/statistics/StatisticsContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceStateImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtils.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtilsTest.java

index d0c282d0ab42896d54dfeec212584903234fd00a..0e96d1aeb85a11e83bf3c6673385303dabc6e95e 100644 (file)
@@ -8,13 +8,13 @@
 
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-import javax.annotation.CheckForNull;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.Timeout;
 import java.math.BigInteger;
 import java.util.List;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
 
-import io.netty.util.Timeout;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -30,6 +30,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegi
 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
@@ -89,9 +90,10 @@ public interface DeviceContext extends AutoCloseable,
      * newRole is {@link OfpRole#BECOMESLAVE}.
      * Parameters are used as marker to be sure it is change to SLAVE from MASTER or from
      * MASTER to SLAVE and the last parameter "cleanDataStore" is used for validation only.
+     * @param oldRole - old role for quick validation for needed processing
      * @param role - NewRole expect to be {@link OfpRole#BECOMESLAVE} or {@link OfpRole#BECOMEMASTER}
      */
-    ListenableFuture<Void> onClusterRoleChange(@CheckForNull OfpRole role);
+    ListenableFuture<Void> onClusterRoleChange(@Nullable OfpRole oldRole, @CheckForNull OfpRole role);
 
     /**
      * Method creates put operation using provided data in underlying transaction chain.
@@ -188,7 +190,12 @@ public interface DeviceContext extends AutoCloseable,
 
     MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext);
 
-    Long getReservedXid();
+    /**
+     * Method is reserved unique XID for Device Message.
+     * Attention: OFJava expect the message, otherwise OutboundQueue could stop working.
+     * @return Reserved XID
+     */
+    Long reservedXidForDeviceMessage();
 
     /**
      * indicates that device context is fully published (e.g.: packetIn messages should be passed)
@@ -211,6 +218,10 @@ public interface DeviceContext extends AutoCloseable,
 
     RpcContext getRpcContext();
 
+    void setStatisticsContext(StatisticsContext statisticsContext);
+
+    StatisticsContext getStatisticsContext();
+
     @Override
     void close();
 }
index 4db91c184bb6043b34a39d0677301115694a46f3..d5fcf47ce94b0beb337487d2ed15af5d988afe11 100644 (file)
@@ -127,5 +127,8 @@ public interface DeviceState {
 
     OfpRole getRole();
 
+    boolean isStatisticsPollingEnabled();
+
+    void setStatisticsPollingEnabledProp(boolean statPollEnabled);
 
 }
index e7c54940ff82a330e824b819c88f668a2defea27..1806b184def74568ff30e381825461f19ec4341b 100644 (file)
@@ -21,6 +21,16 @@ public interface StatisticsContext extends RequestContextStack, AutoCloseable {
 
     ListenableFuture<Boolean> gatherDynamicData();
 
+    /**
+     * Method has to be called from DeviceInitialization Method, otherwise
+     * we are not able to poll anything. Statistics Context normally initialize
+     * this part by initialization process but we don't have this information
+     * in initialization phase and we have to populate whole list after every
+     * device future collecting. Because device future collecting set DeviceState
+     * and we creating marks for the correct kind of stats from DeviceState.
+     */
+    void statListForCollectingInitialization();
+
     /**
      * @param pollTimeout handle to nearest scheduled statistics poll
      */
@@ -35,4 +45,6 @@ public interface StatisticsContext extends RequestContextStack, AutoCloseable {
      * @return dedicated item life cycle change listener (per device)
      */
     ItemLifecycleListener getItemLifeCycleListener();
+    @Override
+    void close();
 }
index 296334dd36554616b9822fcf2ef9af9b04e6d9e1..d5b1cf8874a88f62d61a737e846fdb4149cb7471 100644 (file)
@@ -170,7 +170,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
         registerMXBean(messageIntelligenceAgency);
 
-        deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota);
+        deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota, switchFeaturesMandatory);
         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
         roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, switchFeaturesMandatory);
         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
index c9a17b66ff0ad51d79760821b3823975eaf4db1e..70bd4532cce3e29254d2b34225474eca487ae99f 100644 (file)
@@ -9,7 +9,9 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -33,6 +35,7 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
@@ -58,6 +61,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
@@ -71,6 +75,7 @@ import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
 import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
@@ -148,6 +153,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private RpcContext rpcContext;
     private ExtensionConverterProvider extensionConverterProvider;
 
+    private final boolean switchFeaturesMandatory;
+    private StatisticsContext statCtx;
+
 
     @VisibleForTesting
     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@@ -156,7 +164,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
                       @Nonnull final MessageSpy _messageSpy,
                       @Nonnull final OutboundQueueProvider outboundQueueProvider,
-            @Nonnull final TranslatorLibrary translatorLibrary) {
+                      @Nonnull final TranslatorLibrary translatorLibrary,
+                      final boolean switchFeaturesMandatory) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -198,7 +208,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public Long getReservedXid() {
+    public Long reservedXidForDeviceMessage() {
         return outboundQueueProvider.reserveEntry();
     }
 
@@ -228,35 +238,99 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public ListenableFuture<Void> onClusterRoleChange(@CheckForNull final OfpRole role) {
-        LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+    public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
+        LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
         Preconditions.checkArgument(role != null);
-        if (rpcContext != null) {
-            // TODO : implement interface for onClusterRoleChange method
-            MdSalRegistratorUtils.registerServices(rpcContext, this, role);
+        if (role.equals(oldRole)) {
+            LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
+            return Futures.immediateFuture(null);
         }
-
-        final ListenableFuture<Void> nextStepFuture;
         if (OfpRole.BECOMEMASTER.equals(role)) {
-            transactionChainManager.activateTransactionManager();
-            nextStepFuture = Futures.immediateCheckedFuture(null);
-            getDeviceState().setRole(role);
+            if (!deviceState.deviceSynchronized()) {
+                LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+                transactionChainManager.activateTransactionManager();
+                return Futures.immediateCheckedFuture(null);
+            }
+            /* Relevant for no initial Slave-to-Master scenario in cluster */
+            return asyncClusterRoleChange(role);
         } else if (OfpRole.BECOMESLAVE.equals(role)) {
-            nextStepFuture = transactionChainManager.deactivateTransactionManager();
-            Futures.transform(nextStepFuture, new Function<Void, Void>() {
-                @Nullable
-                @Override
-                public Void apply(@Nullable final Void aVoid) {
-                    getDeviceState().setRole(role);
-                    return null;
-                }
-            });
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
+            }
+            return transactionChainManager.deactivateTransactionManager();
         } else {
-            LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
-            nextStepFuture = Futures.immediateCheckedFuture(null);
+            LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.unregisterServices(rpcContext);
+            }
+            return transactionChainManager.deactivateTransactionManager();
+        }
+    }
+
+    /*
+     * we don't have active TxManager so anything will not be stored to DS yet, but we have
+     * check all NodeInformation for statistics otherwise statistics will not contains
+     * all possible MultipartTypes for polling in StatTypeList
+     */
+    private ListenableFuture<Void> asyncClusterRoleChange(final OfpRole role) {
+        if (statCtx == null) {
+            final String errMsg = String.format("DeviceCtx {} is up but we are missing StatisticsContext", deviceState.getNodeId());
+            LOG.warn(errMsg);
+            return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
         }
 
-        return nextStepFuture;
+        final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        final ReadOnlyTransaction readTx = getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, ofNodeII);
+
+        final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
+                new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+                    @Override
+                    public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
+                        if (!input.isPresent() || input.get().getTable().isEmpty()) {
+                            /* Last master close fail scenario so we would like to activate TxManager */
+                            LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
+                            getDeviceState().setDeviceSynchronized(false);
+                            transactionChainManager.activateTransactionManager();
+                        }
+                        return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
+                    }
+                });
+
+        final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
+                new AsyncFunction<Void, Boolean>() {
+
+                    @Override
+                    public ListenableFuture<Boolean> apply(final Void input) throws Exception {
+                        getStatisticsContext().statListForCollectingInitialization();
+                        if (getDeviceState().deviceSynchronized()) {
+                            return Futures.immediateFuture(Boolean.TRUE);
+                        }
+                        return getStatisticsContext().gatherDynamicData();
+                    }
+                });
+
+        return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
+
+            @Override
+            public Void apply(final Boolean input) {
+                if (!input.booleanValue()) {
+                    LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
+                    DeviceContextImpl.this.close();
+                    return null;
+                }
+                LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
+                if (null != getRpcContext()) {
+                    MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+                }
+                getDeviceState().setDeviceSynchronized(true);
+                getDeviceState().setStatisticsPollingEnabledProp(true);
+                transactionChainManager.activateTransactionManager();
+                return null;
+            }
+        });
     }
 
     @Override
@@ -482,13 +556,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
                 @Override
                 public void onSuccess(final Void result) {
-                    LOG.info("Delete Node {} was successfull.", deviceState.getNodeId());
+                    LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
                     tearDownClean();
                 }
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+                    LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
                     tearDownClean();
                 }
             });
@@ -609,4 +683,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public ExtensionConverterProvider getExtensionConverterProvider() {
         return extensionConverterProvider;
     }
+
+    @Override
+    public void setStatisticsContext(final StatisticsContext statisticsContext) {
+        this.statCtx = statisticsContext;
+    }
+
+    @Override
+    public StatisticsContext getStatisticsContext() {
+        return statCtx;
+    }
 }
index ec0244356788b16624bb4dbafe083a59b81da87a..ca2567b091ef905af56b7cfb2f38984ec5261b5d 100644 (file)
@@ -55,6 +55,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     private static final long TICK_DURATION = 10; // 0.5 sec.
     private final long globalNotificationQuota;
+    private final boolean switchFeaturesMandatory;
     private ScheduledThreadPoolExecutor spyPool;
     private final int spyRate = 10;
 
@@ -74,7 +75,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
-                             final long globalNotificationQuota) {
+                             final long globalNotificationQuota, final boolean switchFeaturesMandatory) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.globalNotificationQuota = globalNotificationQuota;
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
@@ -103,6 +105,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
         // final phase - we have to add new Device to MD-SAL DataStore
+        LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
         Preconditions.checkNotNull(deviceContext);
         try {
             ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
@@ -113,7 +116,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
             try {
                 deviceContext.close();
-            } catch (Exception e1) {
+            } catch (final Exception e1) {
                 LOG.warn("Exception on device context close. ", e);
             }
         }
@@ -145,7 +148,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
         final DeviceState deviceState = createDeviceState(connectionContext);
         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
-                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
 
         deviceContext.addDeviceContextClosedHandler(this);
         Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null);
@@ -230,7 +233,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+    public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
         this.extensionConverterProvider = extensionConverterProvider;
     }
 
index 0a69374b5807040517c59797f2c05281baf65b0c..d49021ed1e067e1e508b2617f9f148b6dfb2332c 100644 (file)
@@ -46,6 +46,7 @@ class DeviceStateImpl implements DeviceState {
     private boolean flowStatisticsAvailable;
     private boolean tableStatisticsAvailable;
     private boolean portStatisticsAvailable;
+    private boolean statPollEnabled;
     private boolean queueStatisticsAvailable;
     private volatile OfpRole role;
 
@@ -55,6 +56,8 @@ class DeviceStateImpl implements DeviceState {
         this.nodeId = Preconditions.checkNotNull(nodeId);
         nodeII = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
         version = featuresReply.getVersion();
+        statPollEnabled = false;
+        deviceSynchronized = false;
     }
 
     @Override
@@ -167,4 +170,14 @@ class DeviceStateImpl implements DeviceState {
     public void setRole(OfpRole role) {
         this.role = role;
     }
+
+    @Override
+    public boolean isStatisticsPollingEnabled() {
+        return statPollEnabled;
+    }
+
+    @Override
+    public void setStatisticsPollingEnabledProp(final boolean statPollEnabled) {
+        this.statPollEnabled = statPollEnabled;
+    }
 }
index 482d2574ad283bf964f195b18d3973d85b26b997..b178f66b5607a8ec1a983a52e3f764a165b94cbc 100644 (file)
@@ -12,7 +12,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -97,7 +99,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * transactions. Call this method for MASTER role only.
      */
     public void activateTransactionManager() {
-        LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId());
+        LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId());
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
                 LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
@@ -264,6 +266,15 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
             future = wTx.submit();
             wTx = null;
+            Futures.withFallback(future, new FutureFallback<Void>() {
+
+                @Override
+                public ListenableFuture<Void> create(final Throwable t) throws Exception {
+                    final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+                    delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+                    return delWtx.submit();
+                }
+            });
         }
         return future;
     }
index 41bbe2eea4e6521843793fc09b52f8a3cff5cd50..c125f030371e136ebf1a6aeb64eacde187f1fcee 100644 (file)
@@ -81,9 +81,9 @@ public class RoleContextImpl implements RoleContext {
     public void initialization() throws CandidateAlreadyRegisteredException {
         state = ROLE_CONTEXT_STATE.STARTING;
         LOG.debug("Initialization requestOpenflowEntityOwnership for entity {}", entity);
-            entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
-            LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
-                    .getPrimaryConnectionContext().getNodeId().getValue());
+        entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
+        LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}",
+                deviceContext.getPrimaryConnectionContext().getNodeId().getValue());
     }
 
     @Override
@@ -103,43 +103,38 @@ public class RoleContextImpl implements RoleContext {
         LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
                 deviceContext.getPrimaryConnectionContext().getNodeId());
 
-        final SetRoleInput setRoleInput = (new SetRoleInputBuilder())
-                .setControllerRole(newRole)
-                .setNode(new NodeRef(deviceContext.getDeviceState().getNodeInstanceIdentifier()))
-                .build();
-
-        final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = salRoleService.setRole(setRoleInput);
-
-        return Futures.transform(JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture),
-                new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
-                    @Override
-                    public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
-                        LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
-                        final ListenableFuture<Void> nextStepFuture;
-                        switch (state) {
-                            case STARTING:
-                                if (OfpRole.BECOMESLAVE.equals(newRole)) {
-                                    getDeviceState().setRole(newRole);
-                                    nextStepFuture = Futures.immediateFuture(null);
-                                } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
-                                    nextStepFuture = deviceContext.onClusterRoleChange(newRole);
-                                } else {
-                                    nextStepFuture = Futures.immediateFuture(null);
-                                }
-
-                                break;
-                            case WORKING:
-                                nextStepFuture = deviceContext.onClusterRoleChange(newRole);
-                                break;
-                            //case TEARING_DOWN:
-                            default:
-                                nextStepFuture = Futures.immediateFuture(null);
-                                break;
-                        }
-
-                        return nextStepFuture;
+
+        final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeRpcFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
+            @Override
+            public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
+                LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
+                final ListenableFuture<Void> nextStepFuture;
+                switch (state) {
+                case STARTING:
+                    if (OfpRole.BECOMESLAVE.equals(newRole)) {
+                        getDeviceState().setRole(newRole);
+                        nextStepFuture = Futures.immediateFuture(null);
+                    } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
+                        nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+                    } else {
+                        nextStepFuture = Futures.immediateFuture(null);
                     }
-                });
+
+                    break;
+                case WORKING:
+                    nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+                    break;
+                //case TEARING_DOWN:
+                default:
+                    nextStepFuture = Futures.immediateFuture(null);
+                    break;
+                }
+
+                return nextStepFuture;
+            }
+        };
+
+        return sendRoleChangeToDevice(newRole, roleChangeRpcFunction);
     }
 
     @Override
@@ -177,7 +172,7 @@ public class RoleContextImpl implements RoleContext {
     @Nullable
     @Override
     public <T> RequestContext<T> createRequestContext() {
-        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
             @Override
             public void close() {
             }
index 9e5e1ff1aaacb4aa1aee5a58c640ac8e90f41d70..3ed9c45390a94cdd79a508e318b0421ebe6906af 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.role;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -35,7 +34,6 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipS
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@@ -83,11 +81,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
-        if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
-            // Roles are not supported before OF1.3, so move forward.
-            deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
-        }
 
         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
                 makeEntity(deviceContext.getDeviceState().getNodeId()),
@@ -188,21 +181,26 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
         Preconditions.checkArgument(ownershipChange != null);
+        RoleContext roleCtxForClose = null;
         try {
             final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
             if (roleContext != null) {
+                roleCtxForClose = roleContext;
                 changeForEntity(ownershipChange, roleContext);
                 return;
             }
 
             final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
             if (txRoleContext != null) {
+                roleCtxForClose = txRoleContext;
                 changeForTxEntity(ownershipChange, txRoleContext);
                 return;
             }
         } catch (final InterruptedException e) {
             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
-            // FIXME: consider forcibly closing this connection
+            if (roleCtxForClose != null) {
+                roleCtxForClose.close();
+            }
         }
 
         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
@@ -337,21 +335,19 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 try {
                     LOG.debug("Node {} is marked as LEADER", nodeId);
                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
-                            "RoleCtx for TxEntity {} master Node {} is still not closed.",
-                            roleContext.getTxEntity(), nodeId);
+                            "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
                     roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
 
                     // try to register tx-candidate via ownership service
                     roleContext.setupTxCandidate();
                 } catch (final CandidateAlreadyRegisteredException e) {
-                    LOG.debug("txCandidate registration failed");
+                    LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
                     // --- CLEAN UP ---
                     // withdraw context from map in order to have it as before
                     txContexts.remove(roleContext.getTxEntity(), roleContext);
                     // no more propagating any role - there is no txCandidate lock approaching
                     roleContext.setPropagatingRole(null);
-
-                    Throwables.propagate(e);
+                    roleContext.getDeviceContext().close();
                 }
                 return null;
             }
@@ -362,7 +358,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
         mainCandidateGuard.acquire();
-        //FIXME : check again implementation for double candidate scenario
         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
 
         if (roleContext.getDeviceState().isValid()) {
@@ -382,9 +377,14 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 } else {
                     txProcessCallback = null;
                 }
-            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
+            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
                 // SLAVE -> MASTER
                 txProcessCallback = makeTxEntitySetupCallback(roleContext);
+            } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
+                if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
+                    rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
+                }
+                txProcessCallback = null;
             } else {
                 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
                 txProcessCallback = null;
@@ -396,22 +396,22 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
 
             // catching result
             Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(@Nullable final Void aVoid) {
-                            LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                        }
+                @Override
+                public void onSuccess(@Nullable final Void aVoid) {
+                    LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    roleContext.setPropagatingRole(newRole);
+                    mainCandidateGuard.release();
+                }
 
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                            roleContext.getDeviceContext().close();
-                        }
-                    }
-            );
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    mainCandidateGuard.release();
+                    roleContext.getDeviceContext().close();
+                }
+            });
 
         } else {
             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
@@ -451,22 +451,23 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         return delFuture;
     }
 
-    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
+
+    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
-        Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
+        Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable final Void aVoid) {
-                LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
+                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
                         .getNodeId(), throwable.getMessage());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
         });
     }
index 9b4cdaffbe4e71ba07f15829135d0cf96cab5f21..b450077582f1238fc11ac25b8d23bdf75a1f1b1e 100644 (file)
@@ -97,12 +97,12 @@ public class RpcContextImpl implements RpcContext {
             LOG.info("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits());
         }
 
-        final Long xid = deviceContext.getReservedXid();
+        final Long xid = deviceContext.reservedXidForDeviceMessage();
         if (xid == null) {
             LOG.error("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId());
         }
 
-        return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+        return new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
             @Override
             public void close() {
                 tracker.release();
index 94d72a3494a37ad98dc00ad2caa6d9dc1112931e..67d08bb0a72c3d0fa2dc9e71d3e5eddf6e2dcd30 100644 (file)
@@ -45,8 +45,8 @@ public class RpcManagerImpl implements RpcManager {
 
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
-        NodeId nodeId = deviceContext.getDeviceState().getNodeId();
-        OfpRole ofpRole = deviceContext.getDeviceState().getRole();
+        final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
+        final OfpRole ofpRole = deviceContext.getDeviceState().getRole();
 
         LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", nodeId, ofpRole);
 
@@ -56,28 +56,26 @@ public class RpcManagerImpl implements RpcManager {
             contexts.put(deviceContext, rpcContext);
         }
 
+        deviceContext.addDeviceContextClosedHandler(this);
 
-        if (ofpRole == OfpRole.BECOMESLAVE) {
-            // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
-            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
-            try {
-                MdSalRegistratorUtils.unregisterServices(rpcContext);
-            } catch (Exception e) {
-                LOG.error("Exception while unregistering rpcs for slave role for node:{}. But continuing.", nodeId, e);
-            }
-
-        } else {
+        if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
             LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
-            MdSalRegistratorUtils.registerServices(rpcContext, deviceContext, ofpRole);
+            MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
 
             if (isStatisticsRpcEnabled) {
                 MdSalRegistratorUtils.registerStatCompatibilityServices(rpcContext, deviceContext,
                         notificationPublishService, new AtomicLong());
             }
+        } else if(OfpRole.BECOMESLAVE.equals(ofpRole)) {
+            // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.registerSlaveServices(rpcContext, ofpRole);
+        } else {
+            // if we don't know role, we need to unregister rpcs if any have been registered
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.unregisterServices(rpcContext);
         }
 
-        deviceContext.addDeviceContextClosedHandler(this);
-
         // finish device initialization cycle back to DeviceManager
         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
@@ -89,24 +87,25 @@ public class RpcManagerImpl implements RpcManager {
 
 
     @Override
-    public void onDeviceContextClosed(DeviceContext deviceContext) {
-        RpcContext removedContext = contexts.remove(deviceContext);
+    public void onDeviceContextClosed(final DeviceContext deviceContext) {
+        final RpcContext removedContext = contexts.remove(deviceContext);
         if (removedContext != null) {
             try {
                 LOG.info("Unregistering rpcs for device context closure");
                 removedContext.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 LOG.error("Exception while unregistering rpcs onDeviceContextClosed handler for node:{}. But continuing.",
                         deviceContext.getDeviceState().getNodeId(), e);
             }
         }
     }
-    public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) {
+    @Override
+    public void setStatisticsRpcEnabled(final boolean isStatisticsRpcEnabled) {
         this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
     }
 
     @Override
-    public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+    public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
         this.notificationPublishService = notificationPublishService;
     }
 }
index 3ef1a95eb753bc1d1179f0e0f45a4f0b931b66ef..46ecf390ce3ca11c6ab8b71babc0f19bc80c79c4 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -23,6 +24,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.CheckForNull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
@@ -51,213 +53,233 @@ public class StatisticsContextImpl implements StatisticsContext {
     private final DeviceContext deviceContext;
     private final DeviceState devState;
     private final ListenableFuture<Boolean> emptyFuture;
-    private final List<MultipartType> collectingStatType;
+    private final boolean shuttingDownStatisticsPolling;
+    private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
+    @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
+    private List<MultipartType> collectingStatType;
 
     private StatisticsGatheringService statisticsGatheringService;
     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
     private Timeout pollTimeout;
 
-    public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) {
+    public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext,
+            final boolean shuttingDownStatisticsPolling) {
         this.deviceContext = Preconditions.checkNotNull(deviceContext);
-        devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+        this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+        this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
         emptyFuture = Futures.immediateFuture(new Boolean(false));
         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
-
-        final List<MultipartType> statListForCollecting = new ArrayList<>();
-        if (devState.isTableStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPTABLE);
-        }
-        if (devState.isFlowStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPFLOW);
-        }
-        if (devState.isGroupAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
-            statListForCollecting.add(MultipartType.OFPMPGROUP);
-        }
-        if (devState.isMetersAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
-            statListForCollecting.add(MultipartType.OFPMPMETER);
-        }
-        if (devState.isPortStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
-        }
-        if (devState.isQueueStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPQUEUE);
-        }
-        collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
+        statListForCollectingInitialization();
+        deviceContext.setStatisticsContext(StatisticsContextImpl.this);
     }
 
     @Override
-    public ListenableFuture<Boolean> gatherDynamicData() {
-        final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
-        if (errorResultFuture != null) {
-            return errorResultFuture;
+    public void statListForCollectingInitialization() {
+        synchronized (COLLECTION_STAT_TYPE_LOCK) {
+            final List<MultipartType> statListForCollecting = new ArrayList<>();
+            if (devState.isTableStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPTABLE);
+            }
+            if (devState.isFlowStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPFLOW);
+            }
+            if (devState.isGroupAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
+                statListForCollecting.add(MultipartType.OFPMPGROUP);
+            }
+            if (devState.isMetersAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
+                statListForCollecting.add(MultipartType.OFPMPMETER);
+            }
+            if (devState.isPortStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
+            }
+            if (devState.isQueueStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPQUEUE);
+            }
+            collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
         }
-        final Iterator<MultipartType> statIterator = collectingStatType.iterator();
-        final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
-        statChainFuture(statIterator, settableStatResultFuture);
-        return settableStatResultFuture;
     }
 
-    private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType) {
-        switch (multipartType) {
-            case OFPMPFLOW:
-                return collectFlowStatistics(multipartType);
-            case OFPMPTABLE:
-                return collectTableStatistics(multipartType);
-            case OFPMPPORTSTATS:
-                return collectPortStatistics(multipartType);
-            case OFPMPQUEUE:
-                return collectQueueStatistics(multipartType);
-            case OFPMPGROUPDESC:
-                return collectGroupDescStatistics(multipartType);
-            case OFPMPGROUP:
-                return collectGroupStatistics(multipartType);
-            case OFPMPMETERCONFIG:
-                return collectMeterConfigStatistics(multipartType);
-            case OFPMPMETER:
-                return collectMeterStatistics(multipartType);
-            default:
-                LOG.warn("Unsuported Statistics type {}", multipartType);
-                return Futures.immediateCheckedFuture(Boolean.TRUE);
+        @Override
+        public ListenableFuture<Boolean> gatherDynamicData() {
+            if (shuttingDownStatisticsPolling) {
+                LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceState().getNodeId());
+                return Futures.immediateFuture(Boolean.TRUE);
+            }
+            final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
+            if (errorResultFuture != null) {
+                return errorResultFuture;
+            }
+            synchronized (COLLECTION_STAT_TYPE_LOCK) {
+                final Iterator<MultipartType> statIterator = collectingStatType.iterator();
+                final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
+                statChainFuture(statIterator, settableStatResultFuture);
+                return settableStatResultFuture;
+            }
         }
-    }
 
-    @Override
-    public <T> RequestContext<T> createRequestContext() {
-        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
-            @Override
-            public void close() {
-                requestContexts.remove(this);
+        private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
+            switch (multipartType) {
+                case OFPMPFLOW:
+                    return collectFlowStatistics(multipartType);
+                case OFPMPTABLE:
+                    return collectTableStatistics(multipartType);
+                case OFPMPPORTSTATS:
+                    return collectPortStatistics(multipartType);
+                case OFPMPQUEUE:
+                    return collectQueueStatistics(multipartType);
+                case OFPMPGROUPDESC:
+                    return collectGroupDescStatistics(multipartType);
+                case OFPMPGROUP:
+                    return collectGroupStatistics(multipartType);
+                case OFPMPMETERCONFIG:
+                    return collectMeterConfigStatistics(multipartType);
+                case OFPMPMETER:
+                    return collectMeterStatistics(multipartType);
+                default:
+                    LOG.warn("Unsuported Statistics type {}", multipartType);
+                    return Futures.immediateCheckedFuture(Boolean.TRUE);
             }
-        };
-        requestContexts.add(ret);
-        return ret;
-    }
-
-    @Override
-    public void close() {
-        for (final RequestContext<?> requestContext : requestContexts) {
-            RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
         }
-        if (null != pollTimeout && !pollTimeout.isExpired()) {
-            pollTimeout.cancel();
-        }
-    }
 
-    @Override
-    public void setPollTimeout(Timeout pollTimeout) {
-        this.pollTimeout = pollTimeout;
-    }
 
-    @Override
-    public Optional<Timeout> getPollTimeout() {
-        return Optional.fromNullable(pollTimeout);
-    }
+        @Override
+        public <T> RequestContext<T> createRequestContext() {
+            final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
+                @Override
+                public void close() {
+                    requestContexts.remove(this);
+                }
+            };
+            requestContexts.add(ret);
+            return ret;
+        }
+
+        @Override
+        public void close () {
+            for (final RequestContext<?> requestContext : requestContexts) {
+                RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
+            }
+            if (null != pollTimeout && !pollTimeout.isExpired()) {
+                pollTimeout.cancel();
+            }
+        }
 
-    void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture) {
-        if ( ! iterator.hasNext()) {
-            resultFuture.set(Boolean.TRUE);
-            LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
-            return;
+        @Override
+        public void setPollTimeout (Timeout pollTimeout){
+            this.pollTimeout = pollTimeout;
         }
 
-        final MultipartType nextType = iterator.next();
-        LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+        @Override
+        public Optional<Timeout> getPollTimeout () {
+            return Optional.fromNullable(pollTimeout);
+        }
 
-        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
-        Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                statChainFuture(iterator, resultFuture);
+        void statChainFuture ( final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture){
+            if (!iterator.hasNext()) {
+                resultFuture.set(Boolean.TRUE);
+                LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
+                return;
             }
-            @Override
-            public void onFailure(final Throwable t) {
-                resultFuture.setException(t);
-            }
-        });
-    }
 
-    /**
-     * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
-     * which has to be returned from caller too
-     *
-     * @return
-     */
-    @VisibleForTesting
-    ListenableFuture<Boolean> deviceConnectionCheck() {
-        if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
-            ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
-            switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
-                case RIP:
-                    final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
-                            deviceContext.getPrimaryConnectionContext().getConnectionState());
-                    resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
-                    break;
-                default:
-                    resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
-                    break;
+            final MultipartType nextType = iterator.next();
+            LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+
+            final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
+            Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(final Boolean result) {
+                    statChainFuture(iterator, resultFuture);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    resultFuture.setException(t);
+                }
+            });
+        }
+
+        /**
+         * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
+         * which has to be returned from caller too
+         *
+         * @return
+         */
+        @VisibleForTesting
+        ListenableFuture<Boolean> deviceConnectionCheck () {
+            if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+                ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
+                switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
+                    case RIP:
+                        final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
+                                deviceContext.getPrimaryConnectionContext().getConnectionState());
+                        resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
+                        break;
+                    default:
+                        resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
+                        break;
+                }
+                return resultingFuture;
             }
-            return resultingFuture;
+            return null;
         }
-        return null;
-    }
 
-    private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType) {
-        return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectFlowStatistics ( final MultipartType multipartType){
+            return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
-        return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectTableStatistics ( final MultipartType multipartType){
+            return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
-        return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectPortStatistics ( final MultipartType multipartType){
+            return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
-        return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectQueueStatistics ( final MultipartType multipartType){
+            return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
-        return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectGroupDescStatistics ( final MultipartType multipartType){
+            return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
-        return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectGroupStatistics ( final MultipartType multipartType){
+            return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
-        return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectMeterConfigStatistics ( final MultipartType multipartType){
+            return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
-        return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectMeterStatistics ( final MultipartType multipartType){
+            return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
+        }
 
-    @VisibleForTesting
-    protected void setStatisticsGatheringService(StatisticsGatheringService statisticsGatheringService) {
-        this.statisticsGatheringService = statisticsGatheringService;
-    }
+        @VisibleForTesting
+        protected void setStatisticsGatheringService ( final StatisticsGatheringService statisticsGatheringService){
+            this.statisticsGatheringService = statisticsGatheringService;
+        }
 
-    @VisibleForTesting
-    protected void setStatisticsGatheringOnTheFlyService(StatisticsGatheringOnTheFlyService
-                                                             statisticsGatheringOnTheFlyService) {
-        this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
-    }
+        @VisibleForTesting
+        protected void setStatisticsGatheringOnTheFlyService ( final StatisticsGatheringOnTheFlyService
+        statisticsGatheringOnTheFlyService){
+            this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
+        }
 
-    @Override
-    public ItemLifecycleListener getItemLifeCycleListener() {
-        return itemLifeCycleListener;
+        @Override
+        public ItemLifecycleListener getItemLifeCycleListener () {
+            return itemLifeCycleListener;
+        }
     }
-}
index 375fae953d8843f64e16314d8e88a3d463b2a7ba..210ffe7274be270f5e3f70fc175d8d1c297788a5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.impl.statistics;
 
+import javax.annotation.CheckForNull;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -18,6 +19,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -49,7 +51,8 @@ import org.slf4j.LoggerFactory;
 public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
-    private final RpcProviderRegistry rpcProviderRegistry;
+
+    private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
 
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
 
@@ -62,7 +65,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
     private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
 
     private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
-    private Semaphore workModeGuard = new Semaphore(1, true);
+    private final Semaphore workModeGuard = new Semaphore(1, true);
     private boolean shuttingDownStatisticsPolling;
     private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
 
@@ -71,13 +74,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
         deviceInitPhaseHandler = handler;
     }
 
-    public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry) {
-        this.rpcProviderRegistry = rpcProviderRegistry;
+    public StatisticsManagerImpl(@CheckForNull final RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
+        Preconditions.checkArgument(rpcProviderRegistry != null);
         controlServiceRegistration = rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this);
-    }
-
-    public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
-        this(rpcProviderRegistry);
         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
     }
 
@@ -89,20 +88,26 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
             hashedWheelTimer = deviceContext.getTimer();
         }
-
-        LOG.info("Starting Statistics for master role for node:{}", deviceContext.getDeviceState().getNodeId());
-
-        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
+        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, shuttingDownStatisticsPolling);
         deviceContext.addDeviceContextClosedHandler(this);
 
-        if (deviceContext.getDeviceState().getRole() == OfpRole.BECOMESLAVE) {
-            // if slave, we dont poll for statistics and jump to rpc initialization
-            LOG.info("Skipping Statistics for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+        if (shuttingDownStatisticsPolling) {
+            LOG.info("Statistics is shutdown for node:{}", deviceContext.getDeviceState().getNodeId());
+        } else {
+            LOG.info("Schedule Statistics poll for node:{}", deviceContext.getDeviceState().getNodeId());
+            if (OfpRole.BECOMEMASTER.equals(deviceContext.getDeviceState().getRole())) {
+                initialStatPollForMaster(statisticsContext, deviceContext);
+                /* we want to wait for initial statCollecting response */
+                return;
+            }
             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
-            deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
         }
+        contexts.put(deviceContext, statisticsContext);
+        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
+        deviceContext.getDeviceState().setDeviceSynchronized(true);
+    }
 
+    private void initialStatPollForMaster(final StatisticsContext statisticsContext, final DeviceContext deviceContext) {
         final ListenableFuture<Boolean> weHaveDynamicData = statisticsContext.gatherDynamicData();
         Futures.addCallback(weHaveDynamicData, new FutureCallback<Boolean>() {
             @Override
@@ -111,6 +116,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                     //there are some statistics on device worth gathering
                     contexts.put(deviceContext, statisticsContext);
                     final TimeCounter timeCounter = new TimeCounter();
+                    deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
                     scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
                     LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
                     try {
@@ -123,22 +129,15 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                     deviceContext.getDeviceState().setDeviceSynchronized(true);
                 } else {
                     final String deviceAdress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress().toString();
-                    try {
-                        deviceContext.close();
-                    } catch (Exception e) {
-                        LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
-                    }
+                    LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
+                    deviceContext.close();
                 }
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
                 LOG.warn("Statistics manager was not able to collect dynamic info for device.", deviceContext.getDeviceState().getNodeId(), throwable);
-                try {
-                    deviceContext.close();
-                } catch (Exception e) {
-                    LOG.warn("Error closing device context.", e);
-                }
+                deviceContext.close();
             }
         });
     }
@@ -151,14 +150,20 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             LOG.debug("Session for device {} is not valid.", deviceContext.getDeviceState().getNodeId().getValue());
             return;
         }
-        LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
+        if (!deviceContext.getDeviceState().isStatisticsPollingEnabled()) {
+            LOG.debug("StatisticsPolling is disabled for device: {} , try later", deviceContext.getDeviceState().getNodeId());
+            scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+            return;
+        }
         if (OfpRole.BECOMESLAVE.equals(deviceContext.getDeviceState().getRole())) {
             LOG.debug("Role is SLAVE so we don't want to poll any stat for device: {}", deviceContext.getDeviceState().getNodeId());
             scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
             return;
         }
+
+        LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
         timeCounter.markStart();
-        ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(final Boolean o) {
@@ -177,14 +182,20 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             }
         });
 
-        final long STATS_TIMEOUT_SEC = 20L;
-        try {
-            deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
-        } catch (final TimeoutException e) {
-            LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
-        }
+        final long averangeTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
+        final long STATS_TIMEOUT_SEC = averangeTime > 0 ? 3 * averangeTime : DEFAULT_STATS_TIMEOUT_SEC;
+        final TimerTask timerTask = new TimerTask() {
+
+            @Override
+            public void run(final Timeout timeout) throws Exception {
+                if (!deviceStatisticsCollectionFuture.isDone()) {
+                    LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext
+                            .getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
+                    deviceStatisticsCollectionFuture.cancel(true);
+                }
+            }
+        };
+        deviceContext.getTimer().newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
     }
 
     private void scheduleNextPolling(final DeviceContext deviceContext,
@@ -193,7 +204,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
         if (null != hashedWheelTimer) {
             LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
             if (!shuttingDownStatisticsPolling) {
-                Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
+                final Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
                     @Override
                     public void run(final Timeout timeout) throws Exception {
                         pollStatistics(deviceContext, statisticsContext, timeCounter);
@@ -208,7 +219,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
 
     @VisibleForTesting
     protected void calculateTimerDelay(final TimeCounter timeCounter) {
-        long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+        final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
         if (averageStatisticsGatheringTime > currentTimerDelay) {
             currentTimerDelay *= 2;
             if (currentTimerDelay > maximumTimerDelay) {
@@ -230,20 +241,16 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
 
     @Override
     public void onDeviceContextClosed(final DeviceContext deviceContext) {
-        StatisticsContext statisticsContext = contexts.remove(deviceContext);
+        final StatisticsContext statisticsContext = contexts.remove(deviceContext);
         if (null != statisticsContext) {
             LOG.trace("Removing device context from stack. No more statistics gathering for node {}", deviceContext.getDeviceState().getNodeId());
-            try {
-                statisticsContext.close();
-            } catch (Exception e) {
-                LOG.debug("Error closing statistic context for node {}.", deviceContext.getDeviceState().getNodeId());
-            }
+            statisticsContext.close();
         }
     }
 
     @Override
     public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
-        GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
+        final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
         smModeOutputBld.setMode(workMode);
         return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
     }
@@ -257,13 +264,13 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             if (!workMode.equals(targetWorkMode)) {
                 shuttingDownStatisticsPolling = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode);
                 // iterate through stats-ctx: propagate mode
-                for (Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
+                for (final Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
                     final DeviceContext deviceContext = contextEntry.getKey();
                     final StatisticsContext statisticsContext = contextEntry.getValue();
                     switch (targetWorkMode) {
                         case COLLECTALL:
                             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
-                            for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+                            for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
                                 lifeCycleSource.setItemLifecycleListener(null);
                             }
                             break;
@@ -272,7 +279,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                             if (pollTimeout.isPresent()) {
                                 pollTimeout.get().cancel();
                             }
-                            for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+                            for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
                                 lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
                             }
                             break;
@@ -298,5 +305,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             controlServiceRegistration.close();
             controlServiceRegistration = null;
         }
+        for (final StatisticsContext statCtx : contexts.values()) {
+            statCtx.close();
+        }
+        contexts.clear();
     }
 }
index 09310a4e397f6dddf630c1626f7182fac10c65f0..edc63863781bea47d86e793a740cc4017b2fa15c 100644 (file)
@@ -435,7 +435,7 @@ public class DeviceInitializationUtils {
 
         final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
 
-        final Long reserved = deviceContext.getReservedXid();
+        final Long reserved = deviceContext.reservedXidForDeviceMessage();
         final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(
                 reserved) {
             @Override
index c7edc72d8a82b065c32f367ca49be9ff3901f001..aee32aea5784269831b51cb1ea7e1b1fbb991c7e 100644 (file)
@@ -10,7 +10,10 @@ package org.opendaylight.openflowplugin.impl.util;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.reflect.TypeToken;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.CheckForNull;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
@@ -61,42 +64,71 @@ public class MdSalRegistratorUtils {
         throw new IllegalStateException();
     }
 
+    /**
+     * Method registers all OF services for role {@link OfpRole#BECOMEMASTER}
+     * @param rpcContext - registration processing is implemented in {@link RpcContext}
+     * @param deviceContext - every service needs {@link DeviceContext} as input parameter
+     * @param newRole - role validation for {@link OfpRole#BECOMEMASTER}
+     */
+    public static void registerMasterServices(@CheckForNull final RpcContext rpcContext,
+            @CheckForNull final DeviceContext deviceContext, @CheckForNull final OfpRole newRole) {
+        Preconditions.checkArgument(rpcContext != null);
+        Preconditions.checkArgument(deviceContext != null);
+        Preconditions.checkArgument(newRole != null);
+        Verify.verify(OfpRole.BECOMEMASTER.equals(newRole), "Service call with bad Role {} we expect role BECOMEMASTER", newRole);
 
-    public static void registerServices(final RpcContext rpcContext, final DeviceContext deviceContext, final OfpRole newRole) {
         rpcContext.registerRpcServiceImplementation(SalEchoService.class, new SalEchoServiceImpl(rpcContext, deviceContext));
 
-        if (OfpRole.BECOMEMASTER.equals(newRole)) {
-            rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
-            //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
-            rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
-            // TODO: experimenter symmetric and multipart message services
-            rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
+        //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+        rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
+        // TODO: experimenter symmetric and multipart message services
+        rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+    }
+
+    /**
+     * Method unregisters all services in first step. So we don't need to call {@link MdSalRegistratorUtils#unregisterServices(RpcContext)}
+     * directly before by change role from {@link OfpRole#BECOMEMASTER} to {@link OfpRole#BECOMESLAVE}.
+     * Method registers {@link SalEchoService} in next step only because we would like to have SalEchoService as local service for all apps
+     * to be able actively check connection status for slave connection too.
+     * @param rpcContext - registration/unregistration processing is implemented in {@link RpcContext}
+     * @param newRole - role validation for {@link OfpRole#BECOMESLAVE}
+     */
+    public static void registerSlaveServices(@CheckForNull final RpcContext rpcContext, @CheckForNull final OfpRole newRole) {
+        Preconditions.checkArgument(rpcContext != null);
+        Preconditions.checkArgument(newRole != null);
+        Verify.verify(OfpRole.BECOMESLAVE.equals(newRole), "Service call with bad Role {} we expect role BECOMESLAVE", newRole);
 
-        } else if (OfpRole.BECOMESLAVE.equals(newRole)) {
-            rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
-            //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
-            rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
-           rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
-            rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
-            rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
-            rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
-            // TODO: experimenter symmetric and multipart message services
-            rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
-        }
+        unregisterServices(rpcContext);
     }
 
-    public static void unregisterServices(final RpcContext rpcContext) throws Exception {
-        rpcContext.close();
+    /**
+     * Method unregisters all OF services.
+     * @param rpcContext - unregistration processing is implemented in {@link RpcContext}
+     */
+    public static void unregisterServices(@CheckForNull final RpcContext rpcContext) {
+        Preconditions.checkArgument(rpcContext != null);
+
+        rpcContext.unregisterRpcServiceImplementation(SalEchoService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
+        //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+        rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
+        rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
+        rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
+        rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
+        // TODO: experimenter symmetric and multipart message services
+        rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
     }
 
     /**
@@ -116,7 +148,7 @@ public class MdSalRegistratorUtils {
                 rpcContext.lookupRpcService(OpendaylightFlowStatisticsService.class));
         Preconditions.checkArgument(COMPOSITE_SERVICE_TYPE_TOKEN.isAssignableFrom(flowStatisticsService.getClass()));
         // attach delegate to flow statistics service (to cover all but aggregated stats with match filter input)
-        OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
+        final OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
                 new OpendaylightFlowStatisticsServiceDelegateImpl(rpcContext, deviceContext, notificationPublishService, new AtomicLong());
         ((Delegator<OpendaylightFlowStatisticsService>) flowStatisticsService).setDelegate(flowStatisticsDelegate);
 
index 7a067d9e1e5d0365897528212f5da1f11fe50017..7be1d00e1610a038baa1b1fc4743f0e357eca288 100644 (file)
@@ -207,7 +207,7 @@ public class DeviceContextImplTest {
                 org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved.class.getName()))))
                 .thenReturn(messageTranslatorFlowRemoved);
 
-        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false);
 
         xid = new Xid(atomicLong.incrementAndGet());
         xidMulti = new Xid(atomicLong.incrementAndGet());
@@ -215,17 +215,17 @@ public class DeviceContextImplTest {
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDataBroker() throws Exception {
-        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDeviceState() throws Exception {
-        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullTimer() throws Exception {
-        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test
@@ -255,7 +255,7 @@ public class DeviceContextImplTest {
 
     @Test
     public void testGetReservedXid() {
-        deviceContext.getReservedXid();
+        deviceContext.reservedXidForDeviceMessage();
         verify(outboundQueueProvider).reserveEntry();
     }
 
index 94ef781bcad22c4c68eba75298c22381a11c4a4b..e4154f6d9afe641ce638f95a05743153da7abd9a 100644 (file)
@@ -146,7 +146,7 @@ public class DeviceManagerImplTest {
 
         MessageIntelligenceAgency mockedMessageIntelligenceAgency = mock(MessageIntelligenceAgency.class);
         DeviceManagerImpl deviceManager = new DeviceManagerImpl(mockedDataBroker, mockedMessageIntelligenceAgency,
-                TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA);
+                TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA, false);
         deviceManager.setDeviceInitializationPhaseHandler(deviceInitPhaseHandler);
 
         return deviceManager;
index b604a8e49964aee60cf30dd1fb164834c68726b0..9779f941327d5a57832a663dca39772081fe78a1 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.impl.role;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -34,6 +33,7 @@ import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@@ -83,6 +83,8 @@ public class RoleContextImplTest {
 
     @Mock
     private FeaturesReply featuresReply;
+    @Mock
+    private MessageSpy mockedMessageSpy;
 
     private final NodeId nodeId = NodeId.getDefaultInstance("openflow:1");
     private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
@@ -94,15 +96,17 @@ public class RoleContextImplTest {
     public void setup() throws CandidateAlreadyRegisteredException {
         when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionContext);
         when(deviceContext.getDeviceState()).thenReturn(deviceState);
+        when(deviceContext.getMessageSpy()).thenReturn(mockedMessageSpy);
         when(connectionContext.getNodeId()).thenReturn(nodeId);
         when(deviceState.getNodeInstanceIdentifier()).thenReturn(instanceIdentifier);
         when(deviceState.getNodeId()).thenReturn(nodeId);
         when(rpcProviderRegistry.getRpcService(SalRoleService.class)).thenReturn(salRoleService);
         when(deviceState.getFeatures()).thenReturn(getFeaturesOutput);
-        when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+        when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_0);
         when(deviceContext.getPrimaryConnectionContext().getFeatures()).thenReturn(featuresReply);
         when(deviceContext.getPrimaryConnectionContext().getConnectionState()).thenReturn(ConnectionContext.CONNECTION_STATE.WORKING);
-        when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any())).thenReturn(Futures.immediateFuture((Void) null));
+        when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any(), Matchers.<OfpRole>any()))
+                .thenReturn(Futures.immediateFuture((Void) null));
 
         roleContext = new RoleContextImpl(deviceContext, entityOwnershipService, entity, txEntity);
         roleContext.initialization();
@@ -123,7 +127,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(FUTURE_SAFETY_TIMEOUT, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -141,7 +145,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext, Mockito.never()).onClusterRoleChange(newRole);
+        verify(deviceContext, Mockito.never()).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -155,12 +159,11 @@ public class RoleContextImplTest {
                 .thenReturn(future);
 
         roleContext.setSalRoleService(salRoleService);
-        roleContext.promoteStateToWorking();
 
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -179,7 +182,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     private class SetRoleInputMatcher extends ArgumentMatcher<SetRoleInput> {
index 61cf8deae50b648996661a3b3dd37aa6b5998779..1b9f7963d68821db02f6a4fae854f2cf30961d3b 100644 (file)
@@ -15,9 +15,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,8 +34,8 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 public class StatisticsContextImplParamTest extends StatisticsContextImpMockInitiation {
 
 
-    public StatisticsContextImplParamTest(boolean isTable, boolean isFlow, boolean isGroup, boolean isMeter, boolean isPort,
-                                          boolean isQueue) {
+    public StatisticsContextImplParamTest(final boolean isTable, final boolean isFlow, final boolean isGroup, final boolean isMeter, final boolean isPort,
+                                          final boolean isQueue) {
         super();
         this.isTable = isTable;
         this.isFlow = isFlow;
@@ -65,9 +63,9 @@ public class StatisticsContextImplParamTest extends StatisticsContextImpMockInit
     @Test
     public void gatherDynamicDataTest() {
 
-        StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
 
-        ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
+        final ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
         when(mockedStatisticsGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
                 .class))).thenReturn(rpcResult);
         when(mockedStatisticsOnFlyGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
@@ -76,7 +74,7 @@ public class StatisticsContextImplParamTest extends StatisticsContextImpMockInit
         statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
         statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
 
-        ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
+        final ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
 
         try {
             assertTrue(futureResult.get());
index 25a153592e731dc210d9b4443691afdc7c44f719..5035051822de4ae035ffabe140175162747030e6 100644 (file)
@@ -16,7 +16,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
@@ -48,19 +47,19 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
 
     @Before
     public void setUp() throws Exception {
-        when(mockedDeviceContext.getReservedXid()).thenReturn(TEST_XID);
+        when(mockedDeviceContext.reservedXidForDeviceMessage()).thenReturn(TEST_XID);
         initStatisticsContext();
     }
 
     private void initStatisticsContext() {
-        statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
         statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
         statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
     }
 
     @Test
     public void testCreateRequestContext() {
-        RequestContext<Object> requestContext = statisticsContext.createRequestContext();
+        final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
         assertNotNull(requestContext);
         assertEquals(TEST_XID, requestContext.getXid().getValue());
         Assert.assertFalse(requestContext.getFuture().isDone());
@@ -71,7 +70,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
      */
     @Test
     public void testClose() throws Exception {
-        StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
         final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
         statisticsContext.close();
         try {
@@ -79,7 +78,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
             final RpcResult<?> rpcResult = requestContext.getFuture().get();
             Assert.assertFalse(rpcResult.isSuccessful());
             Assert.assertFalse(rpcResult.isSuccessful());
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("request future value should be finished", e);
             Assert.fail("request context closing failed");
         }
@@ -139,7 +138,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
         try {
             deviceConnectionCheckResult.get();
             Assert.fail("connection in state RIP should have caused exception here");
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.debug("expected behavior for RIP connection achieved");
             Assert.assertTrue(e instanceof ExecutionException);
         }
@@ -155,7 +154,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
         try {
             final Boolean checkPositive = deviceConnectionCheckResult.get();
             Assert.assertTrue(checkPositive);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             Assert.fail("connection in state HANDSHAKING should NOT have caused exception here");
         }
     }
index 8c0b0bc2e72149962c5c652dd17040c2c3265be6..7a27bb3177749210dcf25ef9fe26a673818dea70 100644 (file)
@@ -142,7 +142,7 @@ public class StatisticsManagerImplTest {
                 Matchers.eq(StatisticsManagerControlService.class),
                 Matchers.<StatisticsManagerControlService>any())).thenReturn(serviceControlRegistration);
 
-        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry);
+        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, false);
     }
 
     @Test
@@ -163,7 +163,7 @@ public class StatisticsManagerImplTest {
         statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
 
         verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
-        verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+        verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
         verify(mockedDeviceState).setDeviceSynchronized(true);
         verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
         verify(hashedWheelTimer).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
@@ -188,7 +188,7 @@ public class StatisticsManagerImplTest {
         statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
 
         verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
-        verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+        verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
         verify(mockedDeviceState).setDeviceSynchronized(true);
         verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
         verify(hashedWheelTimer, Mockito.never()).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
index bb2b353d02374aa5b8d0c4964ed6fc5658bda9be..f8f08b6ad38d483cc6956d76d2eee2b4f331ff21 100644 (file)
@@ -30,7 +30,7 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
 public class MdSalRegistratorUtilsTest {
 
     /**
-     * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerServices
+     * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerMasterServices
      * (RpcContext, DeviceContext)}
      */
     private static final int NUMBER_OF_RPC_SERVICE_REGISTRATION = 11;
@@ -49,7 +49,7 @@ public class MdSalRegistratorUtilsTest {
         when(mockedFeatures.getDatapathId()).thenReturn(mockedDataPathId);
 
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedConnectionContext);
-        MdSalRegistratorUtils.registerServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
+        MdSalRegistratorUtils.registerMasterServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
         verify(mockedRpcContext, times(NUMBER_OF_RPC_SERVICE_REGISTRATION)).registerRpcServiceImplementation(any
                         (RpcService.class.getClass()), any(RpcService.class));
     }