Refactor NodeListener
[openflowplugin.git] / applications / southbound-cli / src / main / java / org / opendaylight / openflowplugin / applications / southboundcli / ReconciliationServiceImpl.java
index f09f748fd90dea53ce32c44808716e442c0b414d..2ba7caa548708f4c039e62f1d8472fae995b1071 100644 (file)
@@ -15,6 +15,7 @@ import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.R
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.lang.management.ManagementFactory;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.util.Date;
@@ -26,6 +27,13 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -33,9 +41,8 @@ import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
-import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
+import org.opendaylight.openflowplugin.applications.southboundcli.alarm.NodeReconciliationAlarm;
 import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
-import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -60,29 +67,59 @@ import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
+// FIXME: this is not just a CLI component, it should live somewhere else
+public final class ReconciliationServiceImpl implements ReconciliationService, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationServiceImpl.class);
+    private static final ObjectName ALARM_NAME;
 
-    private final DataBroker broker;
-    private final FlowNodeReconciliation flowNodeReconciliation;
-    private final AlarmAgent alarmAgent;
-    private final NodeListener nodeListener;
+    static {
+        try {
+            ALARM_NAME = new ObjectName("SDNC.FM:name=NodeReconciliationOperationOngoingBean");
+        } catch (MalformedObjectNameException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private final NodeReconciliationAlarm alarm = new NodeReconciliationAlarm();
     private final Map<String, ReconciliationState> reconciliationStates;
+    private final FlowNodeReconciliation flowNodeReconciliation;
+    private final DpnTracker dpnTracker;
+    private final DataBroker broker;
 
     private ExecutorService executor = Executors.newWorkStealingPool(10);
+    private boolean unregister;
 
     public ReconciliationServiceImpl(final DataBroker broker, final ForwardingRulesManager frm,
-            final AlarmAgent alarmAgent, final NodeListener nodeListener,
-            final FlowGroupCacheManager flowGroupCacheManager) {
+            final DpnTracker dpnTracker, final FlowGroupCacheManager flowGroupCacheManager) {
         this.broker = requireNonNull(broker);
         flowNodeReconciliation = frm.getFlowNodeReconciliation();
-        this.alarmAgent = requireNonNull(alarmAgent);
-        this.nodeListener = requireNonNull(nodeListener);
+        this.dpnTracker = requireNonNull(dpnTracker);
         reconciliationStates = flowGroupCacheManager.getReconciliationStates();
+
+        unregister = false;
+        final var mbs = ManagementFactory.getPlatformMBeanServer();
+        if (!mbs.isRegistered(ALARM_NAME)) {
+            try {
+                mbs.registerMBean(alarm, ALARM_NAME);
+                unregister = true;
+                LOG.info("Registered Mbean {} successfully", ALARM_NAME);
+            } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+                LOG.error("Registeration failed for Mbean {}", ALARM_NAME, e);
+            }
+        }
     }
 
     @Override
     public void close() {
+        if (unregister) {
+            unregister = false;
+            try {
+                ManagementFactory.getPlatformMBeanServer().unregisterMBean(ALARM_NAME);
+            } catch (MBeanRegistrationException | InstanceNotFoundException e) {
+                LOG.error("Unregisteration failed for Mbean {}", ALARM_NAME, e);
+            }
+        }
+
         if (executor != null) {
             executor.shutdownNow();
             executor = null;
@@ -120,11 +157,14 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
                 if (state != null && state.getState().equals(STARTED)) {
                     inprogressNodes.add(Uint64.valueOf(nodeId));
                 } else {
-                    alarmAgent.raiseNodeReconciliationAlarm(nodeId);
+                    final var alarmText = getAlarmText(nodeId,  " started reconciliation");
+                    final var source = getSourceText(nodeId);
+                    LOG.debug("Raising NodeReconciliationOperationOngoing alarm, alarmText {} source {}", alarmText,
+                        source);
+                    alarm.raiseAlarm("NodeReconciliationOperationOngoing", alarmText, source);
                     LOG.info("Executing reconciliation for node {} with state ", nodeId);
                     NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
-                    ReconciliationTask reconcileTask = new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey);
-                    executor.execute(reconcileTask);
+                    executor.execute(new ReconciliationTask(Uint64.valueOf(nodeId), nodeKey));
                 }
             });
             ReconcileOutput reconcilingInProgress = new ReconcileOutputBuilder()
@@ -150,9 +190,26 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
     }
 
     private List<Long> getAllNodes() {
-        List<OFNode> nodeList = ShellUtil.getAllNodes(nodeListener);
-        List<Long> nodes = nodeList.stream().distinct().map(OFNode::getNodeId).collect(Collectors.toList());
-        return nodes;
+        return dpnTracker.currentNodes().stream().distinct().map(OFNode::getNodeId).collect(Collectors.toList());
+    }
+
+    /**
+     * Method gets the alarm text for the nodeId.
+     *
+     * @param nodeId Source of the alarm nodeId
+     * @param event reason for alarm invoke/clear
+     */
+    private static @NonNull String getAlarmText(final Long nodeId, final String event) {
+        return "OF Switch " + nodeId + event;
+    }
+
+    /**
+     * Method gets the source text for the nodeId.
+     *
+     * @param nodeId Source of the alarm nodeId
+     */
+    private static String getSourceText(final Long nodeId) {
+        return "Dpn=" + nodeId;
     }
 
     private final class ReconciliationTask implements Runnable {
@@ -187,7 +244,11 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
                 updateReconciliationState(FAILED);
                 LOG.error("Error occurred while invoking reconcile RPC for node {}", nodeId, e);
             } finally {
-                alarmAgent.clearNodeReconciliationAlarm(nodeId.longValue());
+                final var dpnId = nodeId.longValue();
+                final var alarmText = getAlarmText(dpnId, " finished reconciliation");
+                final var source = getSourceText(dpnId);
+                LOG.debug("Clearing NodeReconciliationOperationOngoing alarm of source {}", source);
+                alarm.clearAlarm("NodeReconciliationOperationOngoing", alarmText, source);
             }
         }
 
@@ -238,8 +299,7 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
         }
 
         private void updateReconciliationState(final ReconciliationState.ReconciliationStatus status) {
-            ReconciliationState state = new ReconciliationState(status, LocalDateTime.now());
-            reconciliationStates.put(nodeId.toString(),state);
+            reconciliationStates.put(nodeId.toString(), new ReconciliationState(status, LocalDateTime.now()));
         }
     }
 }