Merge "Bug-4957 Cluster Role change fix"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / rpc / RpcManagerImpl.java
index d34914422e6dbfafbcf86b5cc82dda3e1202f404..67d08bb0a72c3d0fa2dc9e71d3e5eddf6e2dcd30 100644 (file)
@@ -7,22 +7,34 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RpcManagerImpl implements RpcManager {
 
-    private final ProviderContext providerContext;
+    private static final Logger LOG = LoggerFactory.getLogger(RpcManagerImpl.class);
+    private final RpcProviderRegistry rpcProviderRegistry;
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
-    private final Long maxRequestsQuota;
+    private final int maxRequestsQuota;
+    private final ConcurrentHashMap<DeviceContext, RpcContext> contexts = new ConcurrentHashMap<>();
+    private boolean isStatisticsRpcEnabled;
+    private NotificationPublishService notificationPublishService;
 
-    public RpcManagerImpl(final ProviderContext providerContext,
-                          final Long quotaValue) {
-        this.providerContext = providerContext;
+    public RpcManagerImpl(final RpcProviderRegistry rpcProviderRegistry,
+                          final int quotaValue) {
+        this.rpcProviderRegistry = rpcProviderRegistry;
         maxRequestsQuota = quotaValue;
     }
 
@@ -32,12 +44,68 @@ public class RpcManagerImpl implements RpcManager {
     }
 
     @Override
-    public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
-        final RpcContext rpcContext = new RpcContextImpl(providerContext, deviceContext);
-        rpcContext.setRequestContextQuota(maxRequestsQuota.intValue());
-        deviceContext.setDeviceDisconnectedHandler(rpcContext);
-        MdSalRegistratorUtils.registerServices(rpcContext, deviceContext);
+    public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
+        final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
+        final OfpRole ofpRole = deviceContext.getDeviceState().getRole();
+
+        LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", nodeId, ofpRole);
+
+        RpcContext rpcContext = contexts.get(deviceContext);
+        if (rpcContext == null) {
+            rpcContext = new RpcContextImpl(deviceContext.getMessageSpy(), rpcProviderRegistry, deviceContext, maxRequestsQuota);
+            contexts.put(deviceContext, rpcContext);
+        }
+
+        deviceContext.addDeviceContextClosedHandler(this);
+
+        if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
+            LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
+            MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
+
+            if (isStatisticsRpcEnabled) {
+                MdSalRegistratorUtils.registerStatCompatibilityServices(rpcContext, deviceContext,
+                        notificationPublishService, new AtomicLong());
+            }
+        } else if(OfpRole.BECOMESLAVE.equals(ofpRole)) {
+            // if slave, we need to de-register rpcs if any have been registered, in case of master to slave
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.registerSlaveServices(rpcContext, ofpRole);
+        } else {
+            // if we don't know role, we need to unregister rpcs if any have been registered
+            LOG.info("Unregistering RPC registration (if any) for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
+            MdSalRegistratorUtils.unregisterServices(rpcContext);
+        }
+
         // finish device initialization cycle back to DeviceManager
         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+
+    @Override
+    public void onDeviceContextClosed(final DeviceContext deviceContext) {
+        final RpcContext removedContext = contexts.remove(deviceContext);
+        if (removedContext != null) {
+            try {
+                LOG.info("Unregistering rpcs for device context closure");
+                removedContext.close();
+            } catch (final Exception e) {
+                LOG.error("Exception while unregistering rpcs onDeviceContextClosed handler for node:{}. But continuing.",
+                        deviceContext.getDeviceState().getNodeId(), e);
+            }
+        }
+    }
+    @Override
+    public void setStatisticsRpcEnabled(final boolean isStatisticsRpcEnabled) {
+        this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
+    }
+
+    @Override
+    public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
+        this.notificationPublishService = notificationPublishService;
+    }
 }