Bug-4957 Cluster Role change fix 73/35473/2
authorVaclav Demcak <vdemcak@cisco.com>
Mon, 22 Feb 2016 14:52:38 +0000 (15:52 +0100)
committerJozef Bacigal <jbacigal@cisco.com>
Tue, 1 Mar 2016 12:29:21 +0000 (13:29 +0100)
* TxManager shutdown fail has to clean DS (put empty Node)
* DeviceCtx onRoleChange needs call Device Initialization
  functionality but we need to know when we are able to
  put fresh data in to Oper DS and when we could jump
  write functionality
* DeviceCtx contract change for method OnClusterRoleChange

Note: Cluster Master to Slave change could finish during
initialization phase or TxManager could fail in submitting
last transaction state. So we needs a marker for "something
unexpected is happend" and we have to put fresh data to
Operational DataStore

Change-Id: I18e8c0d75e2e5cf195d9a98c1e096fb5517fab72
Signed-off-by: Jozef Bacigal <jbacigal@cisco.com>
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
23 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceState.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/statistics/StatisticsContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceStateImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtils.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtilsTest.java

index d0c282d0ab42896d54dfeec212584903234fd00a..0e96d1aeb85a11e83bf3c6673385303dabc6e95e 100644 (file)
@@ -8,13 +8,13 @@
 
 package org.opendaylight.openflowplugin.api.openflow.device;
 
-import javax.annotation.CheckForNull;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.Timeout;
 import java.math.BigInteger;
 import java.util.List;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
 
-import io.netty.util.Timeout;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -30,6 +30,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegi
 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
@@ -89,9 +90,10 @@ public interface DeviceContext extends AutoCloseable,
      * newRole is {@link OfpRole#BECOMESLAVE}.
      * Parameters are used as marker to be sure it is change to SLAVE from MASTER or from
      * MASTER to SLAVE and the last parameter "cleanDataStore" is used for validation only.
+     * @param oldRole - old role for quick validation for needed processing
      * @param role - NewRole expect to be {@link OfpRole#BECOMESLAVE} or {@link OfpRole#BECOMEMASTER}
      */
-    ListenableFuture<Void> onClusterRoleChange(@CheckForNull OfpRole role);
+    ListenableFuture<Void> onClusterRoleChange(@Nullable OfpRole oldRole, @CheckForNull OfpRole role);
 
     /**
      * Method creates put operation using provided data in underlying transaction chain.
@@ -188,7 +190,12 @@ public interface DeviceContext extends AutoCloseable,
 
     MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext);
 
-    Long getReservedXid();
+    /**
+     * Method is reserved unique XID for Device Message.
+     * Attention: OFJava expect the message, otherwise OutboundQueue could stop working.
+     * @return Reserved XID
+     */
+    Long reservedXidForDeviceMessage();
 
     /**
      * indicates that device context is fully published (e.g.: packetIn messages should be passed)
@@ -211,6 +218,10 @@ public interface DeviceContext extends AutoCloseable,
 
     RpcContext getRpcContext();
 
+    void setStatisticsContext(StatisticsContext statisticsContext);
+
+    StatisticsContext getStatisticsContext();
+
     @Override
     void close();
 }
index 4db91c184bb6043b34a39d0677301115694a46f3..d5fcf47ce94b0beb337487d2ed15af5d988afe11 100644 (file)
@@ -127,5 +127,8 @@ public interface DeviceState {
 
     OfpRole getRole();
 
+    boolean isStatisticsPollingEnabled();
+
+    void setStatisticsPollingEnabledProp(boolean statPollEnabled);
 
 }
index e7c54940ff82a330e824b819c88f668a2defea27..1806b184def74568ff30e381825461f19ec4341b 100644 (file)
@@ -21,6 +21,16 @@ public interface StatisticsContext extends RequestContextStack, AutoCloseable {
 
     ListenableFuture<Boolean> gatherDynamicData();
 
+    /**
+     * Method has to be called from DeviceInitialization Method, otherwise
+     * we are not able to poll anything. Statistics Context normally initialize
+     * this part by initialization process but we don't have this information
+     * in initialization phase and we have to populate whole list after every
+     * device future collecting. Because device future collecting set DeviceState
+     * and we creating marks for the correct kind of stats from DeviceState.
+     */
+    void statListForCollectingInitialization();
+
     /**
      * @param pollTimeout handle to nearest scheduled statistics poll
      */
@@ -35,4 +45,6 @@ public interface StatisticsContext extends RequestContextStack, AutoCloseable {
      * @return dedicated item life cycle change listener (per device)
      */
     ItemLifecycleListener getItemLifeCycleListener();
+    @Override
+    void close();
 }
index 296334dd36554616b9822fcf2ef9af9b04e6d9e1..d5b1cf8874a88f62d61a737e846fdb4149cb7471 100644 (file)
@@ -170,7 +170,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
         registerMXBean(messageIntelligenceAgency);
 
-        deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota);
+        deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota, switchFeaturesMandatory);
         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
         roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, switchFeaturesMandatory);
         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
index c9a17b66ff0ad51d79760821b3823975eaf4db1e..70bd4532cce3e29254d2b34225474eca487ae99f 100644 (file)
@@ -9,7 +9,9 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -33,6 +35,7 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
@@ -58,6 +61,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRe
 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
@@ -71,6 +75,7 @@ import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
 import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
@@ -148,6 +153,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private RpcContext rpcContext;
     private ExtensionConverterProvider extensionConverterProvider;
 
+    private final boolean switchFeaturesMandatory;
+    private StatisticsContext statCtx;
+
 
     @VisibleForTesting
     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@@ -156,7 +164,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
                       @Nonnull final MessageSpy _messageSpy,
                       @Nonnull final OutboundQueueProvider outboundQueueProvider,
-            @Nonnull final TranslatorLibrary translatorLibrary) {
+                      @Nonnull final TranslatorLibrary translatorLibrary,
+                      final boolean switchFeaturesMandatory) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -198,7 +208,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public Long getReservedXid() {
+    public Long reservedXidForDeviceMessage() {
         return outboundQueueProvider.reserveEntry();
     }
 
@@ -228,35 +238,99 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public ListenableFuture<Void> onClusterRoleChange(@CheckForNull final OfpRole role) {
-        LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+    public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
+        LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
         Preconditions.checkArgument(role != null);
-        if (rpcContext != null) {
-            // TODO : implement interface for onClusterRoleChange method
-            MdSalRegistratorUtils.registerServices(rpcContext, this, role);
+        if (role.equals(oldRole)) {
+            LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
+            return Futures.immediateFuture(null);
         }
-
-        final ListenableFuture<Void> nextStepFuture;
         if (OfpRole.BECOMEMASTER.equals(role)) {
-            transactionChainManager.activateTransactionManager();
-            nextStepFuture = Futures.immediateCheckedFuture(null);
-            getDeviceState().setRole(role);
+            if (!deviceState.deviceSynchronized()) {
+                LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+                transactionChainManager.activateTransactionManager();
+                return Futures.immediateCheckedFuture(null);
+            }
+            /* Relevant for no initial Slave-to-Master scenario in cluster */
+            return asyncClusterRoleChange(role);
         } else if (OfpRole.BECOMESLAVE.equals(role)) {
-            nextStepFuture = transactionChainManager.deactivateTransactionManager();
-            Futures.transform(nextStepFuture, new Function<Void, Void>() {
-                @Nullable
-                @Override
-                public Void apply(@Nullable final Void aVoid) {
-                    getDeviceState().setRole(role);
-                    return null;
-                }
-            });
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
+            }
+            return transactionChainManager.deactivateTransactionManager();
         } else {
-            LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
-            nextStepFuture = Futures.immediateCheckedFuture(null);
+            LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+            if (rpcContext != null) {
+                MdSalRegistratorUtils.unregisterServices(rpcContext);
+            }
+            return transactionChainManager.deactivateTransactionManager();
+        }
+    }
+
+    /*
+     * we don't have active TxManager so anything will not be stored to DS yet, but we have
+     * check all NodeInformation for statistics otherwise statistics will not contains
+     * all possible MultipartTypes for polling in StatTypeList
+     */
+    private ListenableFuture<Void> asyncClusterRoleChange(final OfpRole role) {
+        if (statCtx == null) {
+            final String errMsg = String.format("DeviceCtx {} is up but we are missing StatisticsContext", deviceState.getNodeId());
+            LOG.warn(errMsg);
+            return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
         }
 
-        return nextStepFuture;
+        final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        final ReadOnlyTransaction readTx = getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, ofNodeII);
+
+        final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
+                new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+                    @Override
+                    public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
+                        if (!input.isPresent() || input.get().getTable().isEmpty()) {
+                            /* Last master close fail scenario so we would like to activate TxManager */
+                            LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
+                            getDeviceState().setDeviceSynchronized(false);
+                            transactionChainManager.activateTransactionManager();
+                        }
+                        return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
+                    }
+                });
+
+        final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
+                new AsyncFunction<Void, Boolean>() {
+
+                    @Override
+                    public ListenableFuture<Boolean> apply(final Void input) throws Exception {
+                        getStatisticsContext().statListForCollectingInitialization();
+                        if (getDeviceState().deviceSynchronized()) {
+                            return Futures.immediateFuture(Boolean.TRUE);
+                        }
+                        return getStatisticsContext().gatherDynamicData();
+                    }
+                });
+
+        return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
+
+            @Override
+            public Void apply(final Boolean input) {
+                if (!input.booleanValue()) {
+                    LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
+                    DeviceContextImpl.this.close();
+                    return null;
+                }
+                LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
+                if (null != getRpcContext()) {
+                    MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+                }
+                getDeviceState().setDeviceSynchronized(true);
+                getDeviceState().setStatisticsPollingEnabledProp(true);
+                transactionChainManager.activateTransactionManager();
+                return null;
+            }
+        });
     }
 
     @Override
@@ -482,13 +556,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
                 @Override
                 public void onSuccess(final Void result) {
-                    LOG.info("Delete Node {} was successfull.", deviceState.getNodeId());
+                    LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
                     tearDownClean();
                 }
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+                    LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
                     tearDownClean();
                 }
             });
@@ -609,4 +683,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public ExtensionConverterProvider getExtensionConverterProvider() {
         return extensionConverterProvider;
     }
+
+    @Override
+    public void setStatisticsContext(final StatisticsContext statisticsContext) {
+        this.statCtx = statisticsContext;
+    }
+
+    @Override
+    public StatisticsContext getStatisticsContext() {
+        return statCtx;
+    }
 }
index ec0244356788b16624bb4dbafe083a59b81da87a..ca2567b091ef905af56b7cfb2f38984ec5261b5d 100644 (file)
@@ -55,6 +55,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     private static final long TICK_DURATION = 10; // 0.5 sec.
     private final long globalNotificationQuota;
+    private final boolean switchFeaturesMandatory;
     private ScheduledThreadPoolExecutor spyPool;
     private final int spyRate = 10;
 
@@ -74,7 +75,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
-                             final long globalNotificationQuota) {
+                             final long globalNotificationQuota, final boolean switchFeaturesMandatory) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.globalNotificationQuota = globalNotificationQuota;
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
@@ -103,6 +105,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
         // final phase - we have to add new Device to MD-SAL DataStore
+        LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
         Preconditions.checkNotNull(deviceContext);
         try {
             ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
@@ -113,7 +116,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
             try {
                 deviceContext.close();
-            } catch (Exception e1) {
+            } catch (final Exception e1) {
                 LOG.warn("Exception on device context close. ", e);
             }
         }
@@ -145,7 +148,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
         final DeviceState deviceState = createDeviceState(connectionContext);
         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
-                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
 
         deviceContext.addDeviceContextClosedHandler(this);
         Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null);
@@ -230,7 +233,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+    public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
         this.extensionConverterProvider = extensionConverterProvider;
     }
 
index 0a69374b5807040517c59797f2c05281baf65b0c..d49021ed1e067e1e508b2617f9f148b6dfb2332c 100644 (file)
@@ -46,6 +46,7 @@ class DeviceStateImpl implements DeviceState {
     private boolean flowStatisticsAvailable;
     private boolean tableStatisticsAvailable;
     private boolean portStatisticsAvailable;
+    private boolean statPollEnabled;
     private boolean queueStatisticsAvailable;
     private volatile OfpRole role;
 
@@ -55,6 +56,8 @@ class DeviceStateImpl implements DeviceState {
         this.nodeId = Preconditions.checkNotNull(nodeId);
         nodeII = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
         version = featuresReply.getVersion();
+        statPollEnabled = false;
+        deviceSynchronized = false;
     }
 
     @Override
@@ -167,4 +170,14 @@ class DeviceStateImpl implements DeviceState {
     public void setRole(OfpRole role) {
         this.role = role;
     }
+
+    @Override
+    public boolean isStatisticsPollingEnabled() {
+        return statPollEnabled;
+    }
+
+    @Override
+    public void setStatisticsPollingEnabledProp(final boolean statPollEnabled) {
+        this.statPollEnabled = statPollEnabled;
+    }
 }
index 482d2574ad283bf964f195b18d3973d85b26b997..b178f66b5607a8ec1a983a52e3f764a165b94cbc 100644 (file)
@@ -12,7 +12,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -97,7 +99,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * transactions. Call this method for MASTER role only.
      */
     public void activateTransactionManager() {
-        LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId());
+        LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId());
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
                 LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
@@ -264,6 +266,15 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
             future = wTx.submit();
             wTx = null;
+            Futures.withFallback(future, new FutureFallback<Void>() {
+
+                @Override
+                public ListenableFuture<Void> create(final Throwable t) throws Exception {
+                    final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+                    delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+                    return delWtx.submit();
+                }
+            });
         }
         return future;
     }
index 41bbe2eea4e6521843793fc09b52f8a3cff5cd50..c125f030371e136ebf1a6aeb64eacde187f1fcee 100644 (file)
@@ -81,9 +81,9 @@ public class RoleContextImpl implements RoleContext {
     public void initialization() throws CandidateAlreadyRegisteredException {
         state = ROLE_CONTEXT_STATE.STARTING;
         LOG.debug("Initialization requestOpenflowEntityOwnership for entity {}", entity);
-            entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
-            LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
-                    .getPrimaryConnectionContext().getNodeId().getValue());
+        entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
+        LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}",
+                deviceContext.getPrimaryConnectionContext().getNodeId().getValue());
     }
 
     @Override
@@ -103,43 +103,38 @@ public class RoleContextImpl implements RoleContext {
         LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
                 deviceContext.getPrimaryConnectionContext().getNodeId());
 
-        final SetRoleInput setRoleInput = (new SetRoleInputBuilder())
-                .setControllerRole(newRole)
-                .setNode(new NodeRef(deviceContext.getDeviceState().getNodeInstanceIdentifier()))
-                .build();
-
-        final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = salRoleService.setRole(setRoleInput);
-
-        return Futures.transform(JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture),
-                new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
-                    @Override
-                    public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
-                        LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
-                        final ListenableFuture<Void> nextStepFuture;
-                        switch (state) {
-                            case STARTING:
-                                if (OfpRole.BECOMESLAVE.equals(newRole)) {
-                                    getDeviceState().setRole(newRole);
-                                    nextStepFuture = Futures.immediateFuture(null);
-                                } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
-                                    nextStepFuture = deviceContext.onClusterRoleChange(newRole);
-                                } else {
-                                    nextStepFuture = Futures.immediateFuture(null);
-                                }
-
-                                break;
-                            case WORKING:
-                                nextStepFuture = deviceContext.onClusterRoleChange(newRole);
-                                break;
-                            //case TEARING_DOWN:
-                            default:
-                                nextStepFuture = Futures.immediateFuture(null);
-                                break;
-                        }
-
-                        return nextStepFuture;
+
+        final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeRpcFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
+            @Override
+            public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
+                LOG.debug("Rolechange {} successful made on switch :{}", newRole, deviceContext.getDeviceState().getNodeId());
+                final ListenableFuture<Void> nextStepFuture;
+                switch (state) {
+                case STARTING:
+                    if (OfpRole.BECOMESLAVE.equals(newRole)) {
+                        getDeviceState().setRole(newRole);
+                        nextStepFuture = Futures.immediateFuture(null);
+                    } else if (OfpRole.BECOMEMASTER.equals(newRole)) {
+                        nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+                    } else {
+                        nextStepFuture = Futures.immediateFuture(null);
                     }
-                });
+
+                    break;
+                case WORKING:
+                    nextStepFuture = deviceContext.onClusterRoleChange(oldRole, newRole);
+                    break;
+                //case TEARING_DOWN:
+                default:
+                    nextStepFuture = Futures.immediateFuture(null);
+                    break;
+                }
+
+                return nextStepFuture;
+            }
+        };
+
+        return sendRoleChangeToDevice(newRole, roleChangeRpcFunction);
     }
 
     @Override
@@ -177,7 +172,7 @@ public class RoleContextImpl implements RoleContext {
     @Nullable
     @Override
     public <T> RequestContext<T> createRequestContext() {
-        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
             @Override
             public void close() {
             }
index 9e5e1ff1aaacb4aa1aee5a58c640ac8e90f41d70..3ed9c45390a94cdd79a508e318b0421ebe6906af 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.role;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -35,7 +34,6 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipS
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@@ -83,11 +81,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
-        if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
-            // Roles are not supported before OF1.3, so move forward.
-            deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
-        }
 
         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
                 makeEntity(deviceContext.getDeviceState().getNodeId()),
@@ -188,21 +181,26 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
         Preconditions.checkArgument(ownershipChange != null);
+        RoleContext roleCtxForClose = null;
         try {
             final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
             if (roleContext != null) {
+                roleCtxForClose = roleContext;
                 changeForEntity(ownershipChange, roleContext);
                 return;
             }
 
             final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
             if (txRoleContext != null) {
+                roleCtxForClose = txRoleContext;
                 changeForTxEntity(ownershipChange, txRoleContext);
                 return;
             }
         } catch (final InterruptedException e) {
             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
-            // FIXME: consider forcibly closing this connection
+            if (roleCtxForClose != null) {
+                roleCtxForClose.close();
+            }
         }
 
         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
@@ -337,21 +335,19 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 try {
                     LOG.debug("Node {} is marked as LEADER", nodeId);
                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
-                            "RoleCtx for TxEntity {} master Node {} is still not closed.",
-                            roleContext.getTxEntity(), nodeId);
+                            "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
                     roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
 
                     // try to register tx-candidate via ownership service
                     roleContext.setupTxCandidate();
                 } catch (final CandidateAlreadyRegisteredException e) {
-                    LOG.debug("txCandidate registration failed");
+                    LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
                     // --- CLEAN UP ---
                     // withdraw context from map in order to have it as before
                     txContexts.remove(roleContext.getTxEntity(), roleContext);
                     // no more propagating any role - there is no txCandidate lock approaching
                     roleContext.setPropagatingRole(null);
-
-                    Throwables.propagate(e);
+                    roleContext.getDeviceContext().close();
                 }
                 return null;
             }
@@ -362,7 +358,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
         mainCandidateGuard.acquire();
-        //FIXME : check again implementation for double candidate scenario
         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
 
         if (roleContext.getDeviceState().isValid()) {
@@ -382,9 +377,14 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                 } else {
                     txProcessCallback = null;
                 }
-            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
+            } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
                 // SLAVE -> MASTER
                 txProcessCallback = makeTxEntitySetupCallback(roleContext);
+            } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
+                if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
+                    rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
+                }
+                txProcessCallback = null;
             } else {
                 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
                 txProcessCallback = null;
@@ -396,22 +396,22 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
 
             // catching result
             Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(@Nullable final Void aVoid) {
-                            LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                        }
+                @Override
+                public void onSuccess(@Nullable final Void aVoid) {
+                    LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    roleContext.setPropagatingRole(newRole);
+                    mainCandidateGuard.release();
+                }
 
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
-                                    ownershipChange.getEntity(), oldRole, newRole);
-                            mainCandidateGuard.release();
-                            roleContext.getDeviceContext().close();
-                        }
-                    }
-            );
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
+                            ownershipChange.getEntity(), oldRole, newRole);
+                    mainCandidateGuard.release();
+                    roleContext.getDeviceContext().close();
+                }
+            });
 
         } else {
             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
@@ -451,22 +451,23 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         return delFuture;
     }
 
-    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
+
+    private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
-        Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
+        Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable final Void aVoid) {
-                LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
+                LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
                         .getNodeId(), throwable.getMessage());
-                contexts.remove(ownershipChange.getEntity(), roleChangeListener);
-                ((RoleContext) roleChangeListener).suspendTxCandidate();
+                contexts.remove(ownershipChange.getEntity(), roleContext);
+                roleContext.suspendTxCandidate();
             }
         });
     }
index 9b4cdaffbe4e71ba07f15829135d0cf96cab5f21..b450077582f1238fc11ac25b8d23bdf75a1f1b1e 100644 (file)
@@ -97,12 +97,12 @@ public class RpcContextImpl implements RpcContext {
             LOG.info("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits());
         }
 
-        final Long xid = deviceContext.getReservedXid();
+        final Long xid = deviceContext.reservedXidForDeviceMessage();
         if (xid == null) {
             LOG.error("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId());
         }
 
-        return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+        return new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
             @Override
             public void close() {
                 tracker.release();
index 94d72a3494a37ad98dc00ad2caa6d9dc1112931e..67d08bb0a72c3d0fa2dc9e71d3e5eddf6e2dcd30 100644 (file)
@@ -45,8 +45,8 @@ public class RpcManagerImpl implements RpcManager {
 
     @Override
     public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
-        NodeId nodeId = deviceContext.getDeviceState().getNodeId();
-        OfpRole ofpRole = deviceContext.getDeviceState().getRole();
+        final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
+        final OfpRole ofpRole = deviceContext.getDeviceState().getRole();
 
         LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", nodeId, ofpRole);
 
@@ -56,28 +56,26 @@ public class RpcManagerImpl implements RpcManager {
             contexts.put(deviceContext, rpcContext);
         }
 
+        deviceContext.addDeviceContextClosedHandler(this);
 
-        if (ofpRole == OfpRole.BECOMESLAVE) {
-            // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
-            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
-            try {
-                MdSalRegistratorUtils.unregisterServices(rpcContext);
-            } catch (Exception e) {
-                LOG.error("Exception while unregistering rpcs for slave role for node:{}. But continuing.", nodeId, e);
-            }
-
-        } else {
+        if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
             LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
-            MdSalRegistratorUtils.registerServices(rpcContext, deviceContext, ofpRole);
+            MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
 
             if (isStatisticsRpcEnabled) {
                 MdSalRegistratorUtils.registerStatCompatibilityServices(rpcContext, deviceContext,
                         notificationPublishService, new AtomicLong());
             }
+        } else if(OfpRole.BECOMESLAVE.equals(ofpRole)) {
+            // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.registerSlaveServices(rpcContext, ofpRole);
+        } else {
+            // if we don't know role, we need to unregister rpcs if any have been registered
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.unregisterServices(rpcContext);
         }
 
-        deviceContext.addDeviceContextClosedHandler(this);
-
         // finish device initialization cycle back to DeviceManager
         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
@@ -89,24 +87,25 @@ public class RpcManagerImpl implements RpcManager {
 
 
     @Override
-    public void onDeviceContextClosed(DeviceContext deviceContext) {
-        RpcContext removedContext = contexts.remove(deviceContext);
+    public void onDeviceContextClosed(final DeviceContext deviceContext) {
+        final RpcContext removedContext = contexts.remove(deviceContext);
         if (removedContext != null) {
             try {
                 LOG.info("Unregistering rpcs for device context closure");
                 removedContext.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 LOG.error("Exception while unregistering rpcs onDeviceContextClosed handler for node:{}. But continuing.",
                         deviceContext.getDeviceState().getNodeId(), e);
             }
         }
     }
-    public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) {
+    @Override
+    public void setStatisticsRpcEnabled(final boolean isStatisticsRpcEnabled) {
         this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
     }
 
     @Override
-    public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+    public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
         this.notificationPublishService = notificationPublishService;
     }
 }
index 3ef1a95eb753bc1d1179f0e0f45a4f0b931b66ef..46ecf390ce3ca11c6ab8b71babc0f19bc80c79c4 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -23,6 +24,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.CheckForNull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
@@ -51,213 +53,233 @@ public class StatisticsContextImpl implements StatisticsContext {
     private final DeviceContext deviceContext;
     private final DeviceState devState;
     private final ListenableFuture<Boolean> emptyFuture;
-    private final List<MultipartType> collectingStatType;
+    private final boolean shuttingDownStatisticsPolling;
+    private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
+    @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
+    private List<MultipartType> collectingStatType;
 
     private StatisticsGatheringService statisticsGatheringService;
     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
     private Timeout pollTimeout;
 
-    public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) {
+    public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext,
+            final boolean shuttingDownStatisticsPolling) {
         this.deviceContext = Preconditions.checkNotNull(deviceContext);
-        devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+        this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
+        this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
         emptyFuture = Futures.immediateFuture(new Boolean(false));
         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
-
-        final List<MultipartType> statListForCollecting = new ArrayList<>();
-        if (devState.isTableStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPTABLE);
-        }
-        if (devState.isFlowStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPFLOW);
-        }
-        if (devState.isGroupAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
-            statListForCollecting.add(MultipartType.OFPMPGROUP);
-        }
-        if (devState.isMetersAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
-            statListForCollecting.add(MultipartType.OFPMPMETER);
-        }
-        if (devState.isPortStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
-        }
-        if (devState.isQueueStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPQUEUE);
-        }
-        collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
+        statListForCollectingInitialization();
+        deviceContext.setStatisticsContext(StatisticsContextImpl.this);
     }
 
     @Override
-    public ListenableFuture<Boolean> gatherDynamicData() {
-        final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
-        if (errorResultFuture != null) {
-            return errorResultFuture;
+    public void statListForCollectingInitialization() {
+        synchronized (COLLECTION_STAT_TYPE_LOCK) {
+            final List<MultipartType> statListForCollecting = new ArrayList<>();
+            if (devState.isTableStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPTABLE);
+            }
+            if (devState.isFlowStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPFLOW);
+            }
+            if (devState.isGroupAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
+                statListForCollecting.add(MultipartType.OFPMPGROUP);
+            }
+            if (devState.isMetersAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
+                statListForCollecting.add(MultipartType.OFPMPMETER);
+            }
+            if (devState.isPortStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
+            }
+            if (devState.isQueueStatisticsAvailable()) {
+                statListForCollecting.add(MultipartType.OFPMPQUEUE);
+            }
+            collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
         }
-        final Iterator<MultipartType> statIterator = collectingStatType.iterator();
-        final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
-        statChainFuture(statIterator, settableStatResultFuture);
-        return settableStatResultFuture;
     }
 
-    private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType) {
-        switch (multipartType) {
-            case OFPMPFLOW:
-                return collectFlowStatistics(multipartType);
-            case OFPMPTABLE:
-                return collectTableStatistics(multipartType);
-            case OFPMPPORTSTATS:
-                return collectPortStatistics(multipartType);
-            case OFPMPQUEUE:
-                return collectQueueStatistics(multipartType);
-            case OFPMPGROUPDESC:
-                return collectGroupDescStatistics(multipartType);
-            case OFPMPGROUP:
-                return collectGroupStatistics(multipartType);
-            case OFPMPMETERCONFIG:
-                return collectMeterConfigStatistics(multipartType);
-            case OFPMPMETER:
-                return collectMeterStatistics(multipartType);
-            default:
-                LOG.warn("Unsuported Statistics type {}", multipartType);
-                return Futures.immediateCheckedFuture(Boolean.TRUE);
+        @Override
+        public ListenableFuture<Boolean> gatherDynamicData() {
+            if (shuttingDownStatisticsPolling) {
+                LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceState().getNodeId());
+                return Futures.immediateFuture(Boolean.TRUE);
+            }
+            final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
+            if (errorResultFuture != null) {
+                return errorResultFuture;
+            }
+            synchronized (COLLECTION_STAT_TYPE_LOCK) {
+                final Iterator<MultipartType> statIterator = collectingStatType.iterator();
+                final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
+                statChainFuture(statIterator, settableStatResultFuture);
+                return settableStatResultFuture;
+            }
         }
-    }
 
-    @Override
-    public <T> RequestContext<T> createRequestContext() {
-        final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
-            @Override
-            public void close() {
-                requestContexts.remove(this);
+        private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
+            switch (multipartType) {
+                case OFPMPFLOW:
+                    return collectFlowStatistics(multipartType);
+                case OFPMPTABLE:
+                    return collectTableStatistics(multipartType);
+                case OFPMPPORTSTATS:
+                    return collectPortStatistics(multipartType);
+                case OFPMPQUEUE:
+                    return collectQueueStatistics(multipartType);
+                case OFPMPGROUPDESC:
+                    return collectGroupDescStatistics(multipartType);
+                case OFPMPGROUP:
+                    return collectGroupStatistics(multipartType);
+                case OFPMPMETERCONFIG:
+                    return collectMeterConfigStatistics(multipartType);
+                case OFPMPMETER:
+                    return collectMeterStatistics(multipartType);
+                default:
+                    LOG.warn("Unsuported Statistics type {}", multipartType);
+                    return Futures.immediateCheckedFuture(Boolean.TRUE);
             }
-        };
-        requestContexts.add(ret);
-        return ret;
-    }
-
-    @Override
-    public void close() {
-        for (final RequestContext<?> requestContext : requestContexts) {
-            RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
         }
-        if (null != pollTimeout && !pollTimeout.isExpired()) {
-            pollTimeout.cancel();
-        }
-    }
 
-    @Override
-    public void setPollTimeout(Timeout pollTimeout) {
-        this.pollTimeout = pollTimeout;
-    }
 
-    @Override
-    public Optional<Timeout> getPollTimeout() {
-        return Optional.fromNullable(pollTimeout);
-    }
+        @Override
+        public <T> RequestContext<T> createRequestContext() {
+            final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
+                @Override
+                public void close() {
+                    requestContexts.remove(this);
+                }
+            };
+            requestContexts.add(ret);
+            return ret;
+        }
+
+        @Override
+        public void close () {
+            for (final RequestContext<?> requestContext : requestContexts) {
+                RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED);
+            }
+            if (null != pollTimeout && !pollTimeout.isExpired()) {
+                pollTimeout.cancel();
+            }
+        }
 
-    void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture) {
-        if ( ! iterator.hasNext()) {
-            resultFuture.set(Boolean.TRUE);
-            LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
-            return;
+        @Override
+        public void setPollTimeout (Timeout pollTimeout){
+            this.pollTimeout = pollTimeout;
         }
 
-        final MultipartType nextType = iterator.next();
-        LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+        @Override
+        public Optional<Timeout> getPollTimeout () {
+            return Optional.fromNullable(pollTimeout);
+        }
 
-        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
-        Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(final Boolean result) {
-                statChainFuture(iterator, resultFuture);
+        void statChainFuture ( final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture){
+            if (!iterator.hasNext()) {
+                resultFuture.set(Boolean.TRUE);
+                LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
+                return;
             }
-            @Override
-            public void onFailure(final Throwable t) {
-                resultFuture.setException(t);
-            }
-        });
-    }
 
-    /**
-     * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
-     * which has to be returned from caller too
-     *
-     * @return
-     */
-    @VisibleForTesting
-    ListenableFuture<Boolean> deviceConnectionCheck() {
-        if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
-            ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
-            switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
-                case RIP:
-                    final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
-                            deviceContext.getPrimaryConnectionContext().getConnectionState());
-                    resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
-                    break;
-                default:
-                    resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
-                    break;
+            final MultipartType nextType = iterator.next();
+            LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+
+            final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
+            Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(final Boolean result) {
+                    statChainFuture(iterator, resultFuture);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    resultFuture.setException(t);
+                }
+            });
+        }
+
+        /**
+         * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
+         * which has to be returned from caller too
+         *
+         * @return
+         */
+        @VisibleForTesting
+        ListenableFuture<Boolean> deviceConnectionCheck () {
+            if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+                ListenableFuture<Boolean> resultingFuture = SettableFuture.create();
+                switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
+                    case RIP:
+                        final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
+                                deviceContext.getPrimaryConnectionContext().getConnectionState());
+                        resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
+                        break;
+                    default:
+                        resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
+                        break;
+                }
+                return resultingFuture;
             }
-            return resultingFuture;
+            return null;
         }
-        return null;
-    }
 
-    private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType) {
-        return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectFlowStatistics ( final MultipartType multipartType){
+            return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
-        return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectTableStatistics ( final MultipartType multipartType){
+            return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
-        return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectPortStatistics ( final MultipartType multipartType){
+            return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
-        return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectQueueStatistics ( final MultipartType multipartType){
+            return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
-        return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectGroupDescStatistics ( final MultipartType multipartType){
+            return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
-        return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectGroupStatistics ( final MultipartType multipartType){
+            return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
-        return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectMeterConfigStatistics ( final MultipartType multipartType){
+            return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture;
+        }
 
-    private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
-        return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
-                statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
-    }
+        private ListenableFuture<Boolean> collectMeterStatistics ( final MultipartType multipartType){
+            return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
+                    statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture;
+        }
 
-    @VisibleForTesting
-    protected void setStatisticsGatheringService(StatisticsGatheringService statisticsGatheringService) {
-        this.statisticsGatheringService = statisticsGatheringService;
-    }
+        @VisibleForTesting
+        protected void setStatisticsGatheringService ( final StatisticsGatheringService statisticsGatheringService){
+            this.statisticsGatheringService = statisticsGatheringService;
+        }
 
-    @VisibleForTesting
-    protected void setStatisticsGatheringOnTheFlyService(StatisticsGatheringOnTheFlyService
-                                                             statisticsGatheringOnTheFlyService) {
-        this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
-    }
+        @VisibleForTesting
+        protected void setStatisticsGatheringOnTheFlyService ( final StatisticsGatheringOnTheFlyService
+        statisticsGatheringOnTheFlyService){
+            this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
+        }
 
-    @Override
-    public ItemLifecycleListener getItemLifeCycleListener() {
-        return itemLifeCycleListener;
+        @Override
+        public ItemLifecycleListener getItemLifeCycleListener () {
+            return itemLifeCycleListener;
+        }
     }
-}
index 375fae953d8843f64e16314d8e88a3d463b2a7ba..210ffe7274be270f5e3f70fc175d8d1c297788a5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.impl.statistics;
 
+import javax.annotation.CheckForNull;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -18,6 +19,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -49,7 +51,8 @@ import org.slf4j.LoggerFactory;
 public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
-    private final RpcProviderRegistry rpcProviderRegistry;
+
+    private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
 
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
 
@@ -62,7 +65,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
     private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
 
     private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
-    private Semaphore workModeGuard = new Semaphore(1, true);
+    private final Semaphore workModeGuard = new Semaphore(1, true);
     private boolean shuttingDownStatisticsPolling;
     private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
 
@@ -71,13 +74,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
         deviceInitPhaseHandler = handler;
     }
 
-    public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry) {
-        this.rpcProviderRegistry = rpcProviderRegistry;
+    public StatisticsManagerImpl(@CheckForNull final RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
+        Preconditions.checkArgument(rpcProviderRegistry != null);
         controlServiceRegistration = rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this);
-    }
-
-    public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
-        this(rpcProviderRegistry);
         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
     }
 
@@ -89,20 +88,26 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
             hashedWheelTimer = deviceContext.getTimer();
         }
-
-        LOG.info("Starting Statistics for master role for node:{}", deviceContext.getDeviceState().getNodeId());
-
-        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
+        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, shuttingDownStatisticsPolling);
         deviceContext.addDeviceContextClosedHandler(this);
 
-        if (deviceContext.getDeviceState().getRole() == OfpRole.BECOMESLAVE) {
-            // if slave, we dont poll for statistics and jump to rpc initialization
-            LOG.info("Skipping Statistics for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+        if (shuttingDownStatisticsPolling) {
+            LOG.info("Statistics is shutdown for node:{}", deviceContext.getDeviceState().getNodeId());
+        } else {
+            LOG.info("Schedule Statistics poll for node:{}", deviceContext.getDeviceState().getNodeId());
+            if (OfpRole.BECOMEMASTER.equals(deviceContext.getDeviceState().getRole())) {
+                initialStatPollForMaster(statisticsContext, deviceContext);
+                /* we want to wait for initial statCollecting response */
+                return;
+            }
             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
-            deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
-            return;
         }
+        contexts.put(deviceContext, statisticsContext);
+        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
+        deviceContext.getDeviceState().setDeviceSynchronized(true);
+    }
 
+    private void initialStatPollForMaster(final StatisticsContext statisticsContext, final DeviceContext deviceContext) {
         final ListenableFuture<Boolean> weHaveDynamicData = statisticsContext.gatherDynamicData();
         Futures.addCallback(weHaveDynamicData, new FutureCallback<Boolean>() {
             @Override
@@ -111,6 +116,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                     //there are some statistics on device worth gathering
                     contexts.put(deviceContext, statisticsContext);
                     final TimeCounter timeCounter = new TimeCounter();
+                    deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
                     scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
                     LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
                     try {
@@ -123,22 +129,15 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                     deviceContext.getDeviceState().setDeviceSynchronized(true);
                 } else {
                     final String deviceAdress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress().toString();
-                    try {
-                        deviceContext.close();
-                    } catch (Exception e) {
-                        LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
-                    }
+                    LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
+                    deviceContext.close();
                 }
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
                 LOG.warn("Statistics manager was not able to collect dynamic info for device.", deviceContext.getDeviceState().getNodeId(), throwable);
-                try {
-                    deviceContext.close();
-                } catch (Exception e) {
-                    LOG.warn("Error closing device context.", e);
-                }
+                deviceContext.close();
             }
         });
     }
@@ -151,14 +150,20 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             LOG.debug("Session for device {} is not valid.", deviceContext.getDeviceState().getNodeId().getValue());
             return;
         }
-        LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
+        if (!deviceContext.getDeviceState().isStatisticsPollingEnabled()) {
+            LOG.debug("StatisticsPolling is disabled for device: {} , try later", deviceContext.getDeviceState().getNodeId());
+            scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+            return;
+        }
         if (OfpRole.BECOMESLAVE.equals(deviceContext.getDeviceState().getRole())) {
             LOG.debug("Role is SLAVE so we don't want to poll any stat for device: {}", deviceContext.getDeviceState().getNodeId());
             scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
             return;
         }
+
+        LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
         timeCounter.markStart();
-        ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(final Boolean o) {
@@ -177,14 +182,20 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             }
         });
 
-        final long STATS_TIMEOUT_SEC = 20L;
-        try {
-            deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
-        } catch (final TimeoutException e) {
-            LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
-        }
+        final long averangeTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
+        final long STATS_TIMEOUT_SEC = averangeTime > 0 ? 3 * averangeTime : DEFAULT_STATS_TIMEOUT_SEC;
+        final TimerTask timerTask = new TimerTask() {
+
+            @Override
+            public void run(final Timeout timeout) throws Exception {
+                if (!deviceStatisticsCollectionFuture.isDone()) {
+                    LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext
+                            .getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
+                    deviceStatisticsCollectionFuture.cancel(true);
+                }
+            }
+        };
+        deviceContext.getTimer().newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
     }
 
     private void scheduleNextPolling(final DeviceContext deviceContext,
@@ -193,7 +204,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
         if (null != hashedWheelTimer) {
             LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
             if (!shuttingDownStatisticsPolling) {
-                Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
+                final Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
                     @Override
                     public void run(final Timeout timeout) throws Exception {
                         pollStatistics(deviceContext, statisticsContext, timeCounter);
@@ -208,7 +219,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
 
     @VisibleForTesting
     protected void calculateTimerDelay(final TimeCounter timeCounter) {
-        long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+        final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
         if (averageStatisticsGatheringTime > currentTimerDelay) {
             currentTimerDelay *= 2;
             if (currentTimerDelay > maximumTimerDelay) {
@@ -230,20 +241,16 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
 
     @Override
     public void onDeviceContextClosed(final DeviceContext deviceContext) {
-        StatisticsContext statisticsContext = contexts.remove(deviceContext);
+        final StatisticsContext statisticsContext = contexts.remove(deviceContext);
         if (null != statisticsContext) {
             LOG.trace("Removing device context from stack. No more statistics gathering for node {}", deviceContext.getDeviceState().getNodeId());
-            try {
-                statisticsContext.close();
-            } catch (Exception e) {
-                LOG.debug("Error closing statistic context for node {}.", deviceContext.getDeviceState().getNodeId());
-            }
+            statisticsContext.close();
         }
     }
 
     @Override
     public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
-        GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
+        final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
         smModeOutputBld.setMode(workMode);
         return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
     }
@@ -257,13 +264,13 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             if (!workMode.equals(targetWorkMode)) {
                 shuttingDownStatisticsPolling = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode);
                 // iterate through stats-ctx: propagate mode
-                for (Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
+                for (final Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
                     final DeviceContext deviceContext = contextEntry.getKey();
                     final StatisticsContext statisticsContext = contextEntry.getValue();
                     switch (targetWorkMode) {
                         case COLLECTALL:
                             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
-                            for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+                            for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
                                 lifeCycleSource.setItemLifecycleListener(null);
                             }
                             break;
@@ -272,7 +279,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                             if (pollTimeout.isPresent()) {
                                 pollTimeout.get().cancel();
                             }
-                            for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+                            for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
                                 lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
                             }
                             break;
@@ -298,5 +305,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             controlServiceRegistration.close();
             controlServiceRegistration = null;
         }
+        for (final StatisticsContext statCtx : contexts.values()) {
+            statCtx.close();
+        }
+        contexts.clear();
     }
 }
index 09310a4e397f6dddf630c1626f7182fac10c65f0..edc63863781bea47d86e793a740cc4017b2fa15c 100644 (file)
@@ -435,7 +435,7 @@ public class DeviceInitializationUtils {
 
         final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
 
-        final Long reserved = deviceContext.getReservedXid();
+        final Long reserved = deviceContext.reservedXidForDeviceMessage();
         final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(
                 reserved) {
             @Override
index c7edc72d8a82b065c32f367ca49be9ff3901f001..aee32aea5784269831b51cb1ea7e1b1fbb991c7e 100644 (file)
@@ -10,7 +10,10 @@ package org.opendaylight.openflowplugin.impl.util;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.reflect.TypeToken;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.CheckForNull;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
@@ -61,42 +64,71 @@ public class MdSalRegistratorUtils {
         throw new IllegalStateException();
     }
 
+    /**
+     * Method registers all OF services for role {@link OfpRole#BECOMEMASTER}
+     * @param rpcContext - registration processing is implemented in {@link RpcContext}
+     * @param deviceContext - every service needs {@link DeviceContext} as input parameter
+     * @param newRole - role validation for {@link OfpRole#BECOMEMASTER}
+     */
+    public static void registerMasterServices(@CheckForNull final RpcContext rpcContext,
+            @CheckForNull final DeviceContext deviceContext, @CheckForNull final OfpRole newRole) {
+        Preconditions.checkArgument(rpcContext != null);
+        Preconditions.checkArgument(deviceContext != null);
+        Preconditions.checkArgument(newRole != null);
+        Verify.verify(OfpRole.BECOMEMASTER.equals(newRole), "Service call with bad Role {} we expect role BECOMEMASTER", newRole);
 
-    public static void registerServices(final RpcContext rpcContext, final DeviceContext deviceContext, final OfpRole newRole) {
         rpcContext.registerRpcServiceImplementation(SalEchoService.class, new SalEchoServiceImpl(rpcContext, deviceContext));
 
-        if (OfpRole.BECOMEMASTER.equals(newRole)) {
-            rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
-            //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
-            rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
-            rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
-            // TODO: experimenter symmetric and multipart message services
-            rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalFlowService.class, new SalFlowServiceImpl(rpcContext, deviceContext));
+        //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+        rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, new FlowCapableTransactionServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalMeterService.class, new SalMeterServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalGroupService.class, new SalGroupServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext));
+        rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, new OpendaylightFlowStatisticsServiceImpl(rpcContext, deviceContext));
+        // TODO: experimenter symmetric and multipart message services
+        rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext));
+    }
+
+    /**
+     * Method unregisters all services in first step. So we don't need to call {@link MdSalRegistratorUtils#unregisterServices(RpcContext)}
+     * directly before by change role from {@link OfpRole#BECOMEMASTER} to {@link OfpRole#BECOMESLAVE}.
+     * Method registers {@link SalEchoService} in next step only because we would like to have SalEchoService as local service for all apps
+     * to be able actively check connection status for slave connection too.
+     * @param rpcContext - registration/unregistration processing is implemented in {@link RpcContext}
+     * @param newRole - role validation for {@link OfpRole#BECOMESLAVE}
+     */
+    public static void registerSlaveServices(@CheckForNull final RpcContext rpcContext, @CheckForNull final OfpRole newRole) {
+        Preconditions.checkArgument(rpcContext != null);
+        Preconditions.checkArgument(newRole != null);
+        Verify.verify(OfpRole.BECOMESLAVE.equals(newRole), "Service call with bad Role {} we expect role BECOMESLAVE", newRole);
 
-        } else if (OfpRole.BECOMESLAVE.equals(newRole)) {
-            rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
-            //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
-            rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
-            rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
-           rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
-            rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
-            rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
-            rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
-            // TODO: experimenter symmetric and multipart message services
-            rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
-        }
+        unregisterServices(rpcContext);
     }
 
-    public static void unregisterServices(final RpcContext rpcContext) throws Exception {
-        rpcContext.close();
+    /**
+     * Method unregisters all OF services.
+     * @param rpcContext - unregistration processing is implemented in {@link RpcContext}
+     */
+    public static void unregisterServices(@CheckForNull final RpcContext rpcContext) {
+        Preconditions.checkArgument(rpcContext != null);
+
+        rpcContext.unregisterRpcServiceImplementation(SalEchoService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalFlowService.class);
+        //TODO: add constructors with rcpContext and deviceContext to meter, group, table constructors
+        rpcContext.unregisterRpcServiceImplementation(FlowCapableTransactionService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalMeterService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalGroupService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalTableService.class);
+        rpcContext.unregisterRpcServiceImplementation(SalPortService.class);
+        rpcContext.unregisterRpcServiceImplementation(PacketProcessingService.class);
+        rpcContext.unregisterRpcServiceImplementation(NodeConfigService.class);
+        rpcContext.unregisterRpcServiceImplementation(OpendaylightFlowStatisticsService.class);
+        // TODO: experimenter symmetric and multipart message services
+        rpcContext.unregisterRpcServiceImplementation(SalExperimenterMessageService.class);
     }
 
     /**
@@ -116,7 +148,7 @@ public class MdSalRegistratorUtils {
                 rpcContext.lookupRpcService(OpendaylightFlowStatisticsService.class));
         Preconditions.checkArgument(COMPOSITE_SERVICE_TYPE_TOKEN.isAssignableFrom(flowStatisticsService.getClass()));
         // attach delegate to flow statistics service (to cover all but aggregated stats with match filter input)
-        OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
+        final OpendaylightFlowStatisticsServiceDelegateImpl flowStatisticsDelegate =
                 new OpendaylightFlowStatisticsServiceDelegateImpl(rpcContext, deviceContext, notificationPublishService, new AtomicLong());
         ((Delegator<OpendaylightFlowStatisticsService>) flowStatisticsService).setDelegate(flowStatisticsDelegate);
 
index 7a067d9e1e5d0365897528212f5da1f11fe50017..7be1d00e1610a038baa1b1fc4743f0e357eca288 100644 (file)
@@ -207,7 +207,7 @@ public class DeviceContextImplTest {
                 org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved.class.getName()))))
                 .thenReturn(messageTranslatorFlowRemoved);
 
-        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false);
 
         xid = new Xid(atomicLong.incrementAndGet());
         xidMulti = new Xid(atomicLong.incrementAndGet());
@@ -215,17 +215,17 @@ public class DeviceContextImplTest {
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDataBroker() throws Exception {
-        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDeviceState() throws Exception {
-        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullTimer() throws Exception {
-        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary).close();
+        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, false).close();
     }
 
     @Test
@@ -255,7 +255,7 @@ public class DeviceContextImplTest {
 
     @Test
     public void testGetReservedXid() {
-        deviceContext.getReservedXid();
+        deviceContext.reservedXidForDeviceMessage();
         verify(outboundQueueProvider).reserveEntry();
     }
 
index 94ef781bcad22c4c68eba75298c22381a11c4a4b..e4154f6d9afe641ce638f95a05743153da7abd9a 100644 (file)
@@ -146,7 +146,7 @@ public class DeviceManagerImplTest {
 
         MessageIntelligenceAgency mockedMessageIntelligenceAgency = mock(MessageIntelligenceAgency.class);
         DeviceManagerImpl deviceManager = new DeviceManagerImpl(mockedDataBroker, mockedMessageIntelligenceAgency,
-                TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA);
+                TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA, false);
         deviceManager.setDeviceInitializationPhaseHandler(deviceInitPhaseHandler);
 
         return deviceManager;
index b604a8e49964aee60cf30dd1fb164834c68726b0..9779f941327d5a57832a663dca39772081fe78a1 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.impl.role;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -34,6 +33,7 @@ import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@@ -83,6 +83,8 @@ public class RoleContextImplTest {
 
     @Mock
     private FeaturesReply featuresReply;
+    @Mock
+    private MessageSpy mockedMessageSpy;
 
     private final NodeId nodeId = NodeId.getDefaultInstance("openflow:1");
     private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier = DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
@@ -94,15 +96,17 @@ public class RoleContextImplTest {
     public void setup() throws CandidateAlreadyRegisteredException {
         when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionContext);
         when(deviceContext.getDeviceState()).thenReturn(deviceState);
+        when(deviceContext.getMessageSpy()).thenReturn(mockedMessageSpy);
         when(connectionContext.getNodeId()).thenReturn(nodeId);
         when(deviceState.getNodeInstanceIdentifier()).thenReturn(instanceIdentifier);
         when(deviceState.getNodeId()).thenReturn(nodeId);
         when(rpcProviderRegistry.getRpcService(SalRoleService.class)).thenReturn(salRoleService);
         when(deviceState.getFeatures()).thenReturn(getFeaturesOutput);
-        when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+        when(getFeaturesOutput.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_0);
         when(deviceContext.getPrimaryConnectionContext().getFeatures()).thenReturn(featuresReply);
         when(deviceContext.getPrimaryConnectionContext().getConnectionState()).thenReturn(ConnectionContext.CONNECTION_STATE.WORKING);
-        when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any())).thenReturn(Futures.immediateFuture((Void) null));
+        when(deviceContext.onClusterRoleChange(Matchers.<OfpRole>any(), Matchers.<OfpRole>any()))
+                .thenReturn(Futures.immediateFuture((Void) null));
 
         roleContext = new RoleContextImpl(deviceContext, entityOwnershipService, entity, txEntity);
         roleContext.initialization();
@@ -123,7 +127,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(FUTURE_SAFETY_TIMEOUT, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -141,7 +145,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext, Mockito.never()).onClusterRoleChange(newRole);
+        verify(deviceContext, Mockito.never()).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -155,12 +159,11 @@ public class RoleContextImplTest {
                 .thenReturn(future);
 
         roleContext.setSalRoleService(salRoleService);
-        roleContext.promoteStateToWorking();
 
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     @Test
@@ -179,7 +182,7 @@ public class RoleContextImplTest {
         final ListenableFuture<Void> onRoleChanged = roleContext.onRoleChanged(oldRole, newRole);
         onRoleChanged.get(5, TimeUnit.SECONDS);
 
-        verify(deviceContext).onClusterRoleChange(newRole);
+        verify(deviceContext).onClusterRoleChange(oldRole, newRole);
     }
 
     private class SetRoleInputMatcher extends ArgumentMatcher<SetRoleInput> {
index 61cf8deae50b648996661a3b3dd37aa6b5998779..1b9f7963d68821db02f6a4fae854f2cf30961d3b 100644 (file)
@@ -15,9 +15,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,8 +34,8 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 public class StatisticsContextImplParamTest extends StatisticsContextImpMockInitiation {
 
 
-    public StatisticsContextImplParamTest(boolean isTable, boolean isFlow, boolean isGroup, boolean isMeter, boolean isPort,
-                                          boolean isQueue) {
+    public StatisticsContextImplParamTest(final boolean isTable, final boolean isFlow, final boolean isGroup, final boolean isMeter, final boolean isPort,
+                                          final boolean isQueue) {
         super();
         this.isTable = isTable;
         this.isFlow = isFlow;
@@ -65,9 +63,9 @@ public class StatisticsContextImplParamTest extends StatisticsContextImpMockInit
     @Test
     public void gatherDynamicDataTest() {
 
-        StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
 
-        ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
+        final ListenableFuture<RpcResult<List<MultipartReply>>> rpcResult = immediateFuture(RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build());
         when(mockedStatisticsGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
                 .class))).thenReturn(rpcResult);
         when(mockedStatisticsOnFlyGatheringService.getStatisticsOfType(any(EventIdentifier.class), any(MultipartType
@@ -76,7 +74,7 @@ public class StatisticsContextImplParamTest extends StatisticsContextImpMockInit
         statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
         statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
 
-        ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
+        final ListenableFuture<Boolean> futureResult = statisticsContext.gatherDynamicData();
 
         try {
             assertTrue(futureResult.get());
index 25a153592e731dc210d9b4443691afdc7c44f719..5035051822de4ae035ffabe140175162747030e6 100644 (file)
@@ -16,7 +16,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
@@ -48,19 +47,19 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
 
     @Before
     public void setUp() throws Exception {
-        when(mockedDeviceContext.getReservedXid()).thenReturn(TEST_XID);
+        when(mockedDeviceContext.reservedXidForDeviceMessage()).thenReturn(TEST_XID);
         initStatisticsContext();
     }
 
     private void initStatisticsContext() {
-        statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
         statisticsContext.setStatisticsGatheringService(mockedStatisticsGatheringService);
         statisticsContext.setStatisticsGatheringOnTheFlyService(mockedStatisticsOnFlyGatheringService);
     }
 
     @Test
     public void testCreateRequestContext() {
-        RequestContext<Object> requestContext = statisticsContext.createRequestContext();
+        final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
         assertNotNull(requestContext);
         assertEquals(TEST_XID, requestContext.getXid().getValue());
         Assert.assertFalse(requestContext.getFuture().isDone());
@@ -71,7 +70,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
      */
     @Test
     public void testClose() throws Exception {
-        StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext);
+        final StatisticsContextImpl statisticsContext = new StatisticsContextImpl(mockedDeviceContext, false);
         final RequestContext<Object> requestContext = statisticsContext.createRequestContext();
         statisticsContext.close();
         try {
@@ -79,7 +78,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
             final RpcResult<?> rpcResult = requestContext.getFuture().get();
             Assert.assertFalse(rpcResult.isSuccessful());
             Assert.assertFalse(rpcResult.isSuccessful());
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("request future value should be finished", e);
             Assert.fail("request context closing failed");
         }
@@ -139,7 +138,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
         try {
             deviceConnectionCheckResult.get();
             Assert.fail("connection in state RIP should have caused exception here");
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.debug("expected behavior for RIP connection achieved");
             Assert.assertTrue(e instanceof ExecutionException);
         }
@@ -155,7 +154,7 @@ public class StatisticsContextImplTest extends StatisticsContextImpMockInitiatio
         try {
             final Boolean checkPositive = deviceConnectionCheckResult.get();
             Assert.assertTrue(checkPositive);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             Assert.fail("connection in state HANDSHAKING should NOT have caused exception here");
         }
     }
index 8c0b0bc2e72149962c5c652dd17040c2c3265be6..7a27bb3177749210dcf25ef9fe26a673818dea70 100644 (file)
@@ -142,7 +142,7 @@ public class StatisticsManagerImplTest {
                 Matchers.eq(StatisticsManagerControlService.class),
                 Matchers.<StatisticsManagerControlService>any())).thenReturn(serviceControlRegistration);
 
-        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry);
+        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, false);
     }
 
     @Test
@@ -163,7 +163,7 @@ public class StatisticsManagerImplTest {
         statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
 
         verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
-        verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+        verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
         verify(mockedDeviceState).setDeviceSynchronized(true);
         verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
         verify(hashedWheelTimer).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
@@ -188,7 +188,7 @@ public class StatisticsManagerImplTest {
         statisticsManager.onDeviceContextLevelUp(mockedDeviceContext);
 
         verify(mockedDeviceContext).addDeviceContextClosedHandler(statisticsManager);
-        verify(mockedDeviceContext, Mockito.times(8)).getReservedXid();
+        verify(mockedDeviceContext, Mockito.never()).reservedXidForDeviceMessage();
         verify(mockedDeviceState).setDeviceSynchronized(true);
         verify(mockedDevicePhaseHandler).onDeviceContextLevelUp(mockedDeviceContext);
         verify(hashedWheelTimer, Mockito.never()).newTimeout(Matchers.<TimerTask>any(), Matchers.anyLong(), Matchers.<TimeUnit>any());
index bb2b353d02374aa5b8d0c4964ed6fc5658bda9be..f8f08b6ad38d483cc6956d76d2eee2b4f331ff21 100644 (file)
@@ -30,7 +30,7 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
 public class MdSalRegistratorUtilsTest {
 
     /**
-     * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerServices
+     * Number of currently registrated services (can be changed) in {@link MdSalRegistratorUtils#registerMasterServices
      * (RpcContext, DeviceContext)}
      */
     private static final int NUMBER_OF_RPC_SERVICE_REGISTRATION = 11;
@@ -49,7 +49,7 @@ public class MdSalRegistratorUtilsTest {
         when(mockedFeatures.getDatapathId()).thenReturn(mockedDataPathId);
 
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedConnectionContext);
-        MdSalRegistratorUtils.registerServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
+        MdSalRegistratorUtils.registerMasterServices(mockedRpcContext,mockedDeviceContext, OfpRole.BECOMEMASTER);
         verify(mockedRpcContext, times(NUMBER_OF_RPC_SERVICE_REGISTRATION)).registerRpcServiceImplementation(any
                         (RpcService.class.getClass()), any(RpcService.class));
     }