Merge "OPNFLWPLUG-1087: ODL controller to provide view of openflow node reconciliatio...
authorArunprakash D <d.arunprakash@ericsson.com>
Thu, 16 Apr 2020 07:15:06 +0000 (07:15 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 16 Apr 2020 07:15:06 +0000 (07:15 +0000)
21 files changed:
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/FlowNodeReconciliation.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXAgent.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXService.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXServiceMBean.java [new file with mode: 0644]
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/DeviceMastershipManager.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/ForwardingRulesManagerImpl.java
applications/forwardingrules-manager/src/main/resources/OSGI-INF/blueprint/forwardingrules-manager.xml
applications/forwardingrules-manager/src/test/java/test/mock/FlowListenerTest.java
applications/forwardingrules-manager/src/test/java/test/mock/GroupListenerTest.java
applications/forwardingrules-manager/src/test/java/test/mock/MeterListenerTest.java
applications/forwardingrules-manager/src/test/java/test/mock/NodeListenerTest.java
applications/forwardingrules-manager/src/test/java/test/mock/TableFeaturesListenerTest.java
applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/ReconciliationServiceImpl.java
applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/cli/GetReconciliationStateProvider.java [new file with mode: 0644]
applications/southbound-cli/src/main/resources/OSGI-INF/blueprint/commands.xml [new file with mode: 0644]
applications/southbound-cli/src/main/resources/OSGI-INF/blueprint/southbound-cli.xml
applications/southbound-cli/src/main/yang/reconciliation.yang
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/FlowGroupCacheManager.java [new file with mode: 0644]
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/ReconciliationState.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/cache/FlowGroupCacheManagerImpl.java [new file with mode: 0644]

index f408fe21e60560d8930a1c98efcd131fe38ee3a7..92d4c7895b8ccae11300cc0bc6f84b9aa3485af9 100644 (file)
@@ -23,4 +23,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  */
 public interface FlowNodeReconciliation extends ReconciliationNotificationListener, AutoCloseable {
     ListenableFuture<Boolean> reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode);
+
+    void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode);
 }
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXAgent.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXAgent.java
new file mode 100644 (file)
index 0000000..2cb492c
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.lang.management.ManagementFactory;
+import javax.inject.Inject;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ReconciliationJMXAgent {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationJMXAgent.class);
+    private static final String OF_RECONC_BEANNAME
+            = "org.opendaylight.openflowplugin.frm:type=ReconciliationState";
+    private MBeanServer mbs = null;
+    private ObjectName objectName = null;
+
+    @Inject
+    public ReconciliationJMXAgent(final ReconciliationJMXService reconciliationJMXService) {
+        mbs = ManagementFactory.getPlatformMBeanServer();
+        try {
+            objectName = new ObjectName(OF_RECONC_BEANNAME);
+            registerReconciliationMbean(reconciliationJMXService);
+        } catch (MalformedObjectNameException e) {
+            LOG.error("ObjectName instance creation failed for Mbean {} : ", OF_RECONC_BEANNAME, e);
+        }
+    }
+
+    public void registerReconciliationMbean(ReconciliationJMXService reconciliationJMXService) {
+        try {
+            // Uniquely identify the MBeans and register them with the platform MBeanServer
+            if (!mbs.isRegistered(objectName)) {
+                mbs.registerMBean(reconciliationJMXService, objectName);
+                LOG.debug("Registered Mbean {} successfully", OF_RECONC_BEANNAME);
+            }
+        } catch (MBeanRegistrationException | InstanceAlreadyExistsException | NotCompliantMBeanException e) {
+            LOG.error("Registeration failed for Mbean {} : ", OF_RECONC_BEANNAME , e);
+        }
+    }
+}
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXService.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXService.java
new file mode 100644 (file)
index 0000000..67c33d0
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.inject.Inject;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+
+public class ReconciliationJMXService implements ReconciliationJMXServiceMBean {
+    private FlowGroupCacheManager flowGroupCacheManager;
+
+    @Inject
+    public ReconciliationJMXService(final FlowGroupCacheManager floGroupCacheManager) {
+        this.flowGroupCacheManager = floGroupCacheManager;
+    }
+
+    @Override
+    public Map<String, String> acquireReconciliationStates() {
+        Map<String, String> reconciliationStatesMap = new HashMap<>();
+        flowGroupCacheManager.getReconciliationStates().forEach((datapathId, reconciliationState) ->
+                reconciliationStatesMap.put(datapathId, reconciliationState.toString()));
+        return reconciliationStatesMap;
+    }
+}
diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXServiceMBean.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ReconciliationJMXServiceMBean.java
new file mode 100644 (file)
index 0000000..5f78734
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.util.Map;
+
+public interface ReconciliationJMXServiceMBean {
+    /* The MapKey is datapathId
+       The MapValue is ReconciliationStates of the respective nodes.
+     */
+    Map<String, String> acquireReconciliationStates();
+}
\ No newline at end of file
index 4194a540239fc9c6a47fd230335d4ef3b2b73ea6..ede9d4845b1462eeefa4427f50cc2d1f84768692 100644 (file)
@@ -134,6 +134,7 @@ public class DeviceMastershipManager implements ClusteredDataTreeChangeListener<
                     if (activeNodes.contains(nodeIdent)) {
                         Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
                         set.remove(nodeIdent);
+                        reconcliationAgent.flowNodeDisconnected(nodeIdent);
                         activeNodes = Collections.unmodifiableSet(set);
                         setNodeOperationalStatus(nodeIdent, false);
                     }
index a3b920f4d841e7378c339085869a6de13cf4053d..5976cbb3533b5d39ff31804311cd462bd2688467 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FluentFuture;
@@ -18,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.math.BigInteger;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,6 +45,8 @@ import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
@@ -136,9 +143,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
     private static final AtomicLong BUNDLE_ID = new AtomicLong();
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+    private Map<String, ReconciliationState> reconciliationStates;
 
     public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db,
-            final String serviceName, final int priority, final ResultState resultState) {
+                                      final String serviceName, final int priority, final ResultState resultState,
+                                      final FlowGroupCacheManager flowGroupCacheManager) {
         this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
         dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
         this.serviceName = serviceName;
@@ -146,6 +155,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         this.resultState = resultState;
         salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),
                 "salBundleService can not be null!");
+        reconciliationStates = flowGroupCacheManager.getReconciliationStates();
     }
 
     @Override
@@ -175,6 +185,13 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         }
     }
 
+    @Override
+    public void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode) {
+        String node = disconnectedNode.firstKeyOf(Node.class).getId().getValue();
+        BigInteger dpnId = getDpnIdFromNodeName(node);
+        reconciliationStates.remove(dpnId.toString());
+    }
+
     private class BundleBasedReconciliationTask implements Callable<Boolean> {
         final InstanceIdentifier<FlowCapableNode> nodeIdentity;
 
@@ -195,102 +212,108 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
             } catch (ExecutionException | InterruptedException e) {
                 LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
             }
-            try {
-                if (flowNode.isPresent()) {
-                    LOG.debug("FlowNode present for Datapath ID {}", dpnId);
-                    OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
-                    final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
-
-                    final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                            .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
-
-                    final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                            .setType(BundleControlType.ONFBCTOPENREQUEST).build();
-
-                    final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                            .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
-
-                    final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
-                            .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                            .setMessages(createMessages(nodeRef)).build();
-
-                    LOG.debug("Closing openflow bundle for device {}", dpnId);
-                    /* Close previously opened bundle on the openflow switch if any */
-                    ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
-                            = salBundleService.controlBundle(closeBundleInput);
-
-                    /* Open a new bundle on the switch */
-                    ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
+
+            if (flowNode.isPresent()) {
+                ReconciliationState reconciliationState = new ReconciliationState(
+                        STARTED, LocalDateTime.now());
+                //put the dpn info into the map
+                reconciliationStates.put(dpnId.toString(), reconciliationState);
+                LOG.debug("FlowNode present for Datapath ID {}", dpnId);
+                OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
+                final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+                final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                        .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
+                final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                        .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+
+                final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                        .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+
+                final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
+                        .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                        .setMessages(createMessages(nodeRef)).build();
+
+                LOG.debug("Closing openflow bundle for device {}", dpnId);
+                /* Close previously opened bundle on the openflow switch if any */
+                ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
+                        = salBundleService.controlBundle(closeBundleInput);
+
+                /* Open a new bundle on the switch */
+                ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
                         Futures.transformAsync(closeBundle,
                             rpcResult -> salBundleService.controlBundle(openBundleInput),
                                 service);
 
                     /* Push groups and flows via bundle add messages */
-                    ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
-                            = Futures.transformAsync(openBundle, rpcResult -> {
-                                if (rpcResult.isSuccessful()) {
-                                    return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
-                                }
-                                return Futures.immediateFuture(null);
-                            }, service);
-
-                    /* Push flows and groups via bundle add messages */
-                    Optional<FlowCapableNode> finalFlowNode = flowNode;
-                    ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
-                            = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
-                                if (rpcResult.isSuccessful()) {
-                                    LOG.debug("Adding delete all flow/group message is successful for device {}",dpnId);
-                                    return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
-                                            nodeIdentity));
-                                }
-                                return Futures.immediateFuture(null);
-                            }, service);
+                ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
+                        = Futures.transformAsync(openBundle, rpcResult -> {
+                            if (rpcResult.isSuccessful()) {
+                                return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+                            }
+                            return Futures.immediateFuture(null);
+                        }, service);
+
+                /* Push flows and groups via bundle add messages */
+                Optional<FlowCapableNode> finalFlowNode = flowNode;
+                ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+                        = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+                            if (rpcResult.isSuccessful()) {
+                                LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
+                                return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+                                        nodeIdentity));
+                            }
+                            return Futures.immediateFuture(null);
+                        }, service);
 
                     /* Commit the bundle on the openflow switch */
-                    ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
-                            addbundlesFuture, rpcResult -> {
+                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
+                        = Futures.transformAsync(addbundlesFuture, rpcResult -> {
                             LOG.debug("Adding bundle messages completed for device {}", dpnId);
                             return salBundleService.controlBundle(commitBundleInput);
                         }, service);
 
-                    /* Bundles not supported for meters */
-                    List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
-                            : Collections.emptyList();
-                    Futures.transformAsync(commitBundleFuture,
-                        rpcResult -> {
-                            if (rpcResult.isSuccessful()) {
-                                for (Meter meter : meters) {
-                                    final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
-                                            .child(Meter.class, meter.key());
-                                    provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
-                                }
+                /* Bundles not supported for meters */
+                List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
+                        : Collections.emptyList();
+                Futures.transformAsync(commitBundleFuture,
+                    rpcResult -> {
+                        if (rpcResult.isSuccessful()) {
+                            for (Meter meter : meters) {
+                                final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
+                                        .child(Meter.class, meter.key());
+                                provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
                             }
-                            return Futures.immediateFuture(null);
-                        }, service);
-                    try {
-                        RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
-                        if (bundleFuture != null && bundleFuture.isSuccessful()) {
-                            LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
-                            OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
-                            return true;
-                        } else {
-                            LOG.error("commit bundle failed for device {} with error {}", dpnId,
-                                    commitBundleFuture.get().getErrors());
-                            return false;
                         }
-                    } catch (InterruptedException | ExecutionException e) {
-                        LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
+                        return Futures.immediateFuture(null);
+                    }, service);
+                try {
+                    RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+                    if (bundleFuture != null && bundleFuture.isSuccessful()) {
+                        reconciliationState.setState(COMPLETED, LocalDateTime.now());
+                        LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+                        OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
+                        return true;
+                    } else {
+                        reconciliationState.setState(FAILED, LocalDateTime.now());
+                        LOG.error("commit bundle failed for device {} with error {}", dpnId,
+                                commitBundleFuture.get().getErrors());
                         return false;
                     }
+                } catch (InterruptedException | ExecutionException e) {
+                    reconciliationState.setState(FAILED, LocalDateTime.now());
+                    LOG.error("commit bundle failed for device {} with error ", dpnId, e);
+                    return false;
+                } finally {
+                    service.shutdown();
                 }
-                LOG.error("FlowNode not present for Datapath ID {}", dpnId);
-                return false;
-            }  finally {
-                service.shutdown();
             }
+            LOG.error("FlowNode not present for Datapath ID {}", dpnId);
+            return false;
         }
     }
 
@@ -356,6 +379,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 /* Tables - have to be pushed before groups */
                 // CHECK if while pushing the update, updateTableInput can be null to emulate a
                 // table add
+                ReconciliationState reconciliationState = new ReconciliationState(
+                        STARTED, LocalDateTime.now());
+                //put the dpn info into the map
+                reconciliationStates.put(dpnId.toString(), reconciliationState);
+                LOG.debug("Triggering reconciliation for node {} with state: {}", dpnId, STARTED);
                 List<TableFeatures> tableList = flowNode.get().getTableFeatures() != null
                         ? flowNode.get().getTableFeatures()
                         : Collections.<TableFeatures>emptyList();
@@ -496,6 +524,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity);
                     }
                 }
+                reconciliationState.setState(COMPLETED, LocalDateTime.now());
                 OF_EVENT_LOG.debug("Reconciliation Finish, Node: {}, flow count: {}", dpnId, flowCount);
             }
             return true;
index db6da55a5b28038055a71d98fc0c91229721573f..30ea3e27d3d89524104b841eccecd4964b7bb2a2 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
@@ -105,6 +106,7 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
     private boolean isBundleBasedReconciliationEnabled;
     private final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler;
     private final ServiceRecoveryRegistry serviceRecoveryRegistry;
+    private final FlowGroupCacheManager flowGroupCacheManager;
 
     @Inject
     public ForwardingRulesManagerImpl(@Reference final DataBroker dataBroker,
@@ -116,7 +118,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
                                       @Reference final ConfigurationService configurationService,
                                       @Reference final ReconciliationManager reconciliationManager,
                                       final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler,
-                                      @Reference final ServiceRecoveryRegistry serviceRecoveryRegistry) {
+                                      @Reference final ServiceRecoveryRegistry serviceRecoveryRegistry,
+                                      @Reference final FlowGroupCacheManager flowGroupCacheManager) {
         disableReconciliation = config.isDisableReconciliation();
         staleMarkingEnabled = config.isStaleMarkingEnabled();
         reconciliationRetryCount = config.getReconciliationRetryCount().toJava();
@@ -128,6 +131,7 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
         this.reconciliationManager = reconciliationManager;
         this.rpcProviderService = rpcProviderService;
         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
+        this.flowGroupCacheManager = flowGroupCacheManager;
 
         Preconditions.checkArgument(rpcRegistry != null, "RpcProviderRegistry can not be null !");
 
@@ -155,9 +159,8 @@ public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
     public void start() {
         nodeConfigurator = new NodeConfiguratorImpl();
         this.devicesGroupRegistry = new DevicesGroupRegistry();
-
         this.nodeListener = new FlowNodeReconciliationImpl(this, dataService, SERVICE_NAME, FRM_RECONCILIATION_PRIORITY,
-                ResultState.DONOTHING);
+                ResultState.DONOTHING, flowGroupCacheManager);
         if (this.isReconciliationDisabled()) {
             LOG.debug("Reconciliation is disabled by user");
         } else {
index b69d74b2eba012570b1a1eefb6e71bcc2cf3e7d5..438fdd64584d80dae506e37668a99961857fbccb 100644 (file)
@@ -7,4 +7,19 @@
   <odl:clustered-app-config id="forwardingRulesManagerConfig"
         binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig"/>
 
+  <bean id="reconciliationJMXService"
+        class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXService">
+    <argument ref="flowGroupCacheManager"/>
+  </bean>
+  <bean id="reconciliationJMXAgent"
+        class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXAgent">
+    <argument ref="reconciliationJMXService"/>
+  </bean>
+  <bean id="reconciliationJMXServiceMBean"
+        class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXService">
+    <argument ref="flowGroupCacheManager"/>
+  </bean>
+
+  <service ref="reconciliationJMXServiceMBean" interface="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean"/>
+
 </blueprint>
index ff49d05e964e17cfb77007563d03d1f26f892dbd..1e49669077f1a025a3186b22c23b0f42478f5f48 100644 (file)
@@ -23,6 +23,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
@@ -76,14 +77,15 @@ public class FlowListenerTest extends FRMTest {
     private ServiceRecoveryRegistry serviceRecoveryRegistry;
     @Mock
     private MastershipChangeServiceManager mastershipChangeServiceManager;
+    @Mock
+    private FlowGroupCacheManager flowGroupCacheManager;
 
     @Before
     public void setUp() {
         forwardingRulesManager = new ForwardingRulesManagerImpl(getDataBroker(), rpcProviderRegistryMock,
                 rpcProviderRegistryMock, getConfig(), mastershipChangeServiceManager, clusterSingletonService,
                 getConfigurationService(), reconciliationManager, openflowServiceRecoveryHandler,
-                serviceRecoveryRegistry);
-
+                serviceRecoveryRegistry, flowGroupCacheManager);
         forwardingRulesManager.start();
         // TODO consider tests rewrite (added because of complicated access)
         forwardingRulesManager.setDeviceMastershipManager(deviceMastershipManager);
index bc892432f558d7cddf8227e9bb317194a50d5e2f..f7614879dcbdeef2a846fe294870a7ee1bb459d2 100644 (file)
@@ -22,6 +22,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
@@ -66,6 +67,9 @@ public class GroupListenerTest extends FRMTest {
     private ServiceRecoveryRegistry serviceRecoveryRegistry;
     @Mock
     private MastershipChangeServiceManager mastershipChangeServiceManager;
+    @Mock
+    private FlowGroupCacheManager flowGroupCacheManager;
+
 
     @Before
     public void setUp() {
@@ -79,7 +83,8 @@ public class GroupListenerTest extends FRMTest {
                 getConfigurationService(),
                 reconciliationManager,
                 openflowServiceRecoveryHandler,
-                serviceRecoveryRegistry);
+                serviceRecoveryRegistry,
+                flowGroupCacheManager);
 
         forwardingRulesManager.start();
         // TODO consider tests rewrite (added because of complicated access)
index 6ecd880336610131735c887fc81679eab4c23129..1fade0fdfbc722feab5bb9129d1e53f3433035e4 100644 (file)
@@ -20,6 +20,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
@@ -64,6 +65,9 @@ public class MeterListenerTest extends FRMTest {
     private ServiceRecoveryRegistry serviceRecoveryRegistry;
     @Mock
     private MastershipChangeServiceManager mastershipChangeServiceManager;
+    @Mock
+    private FlowGroupCacheManager flowGroupCacheManager;
+
 
     @Before
     public void setUp() {
@@ -77,7 +81,8 @@ public class MeterListenerTest extends FRMTest {
                 getConfigurationService(),
                 reconciliationManager,
                 openflowServiceRecoveryHandler,
-                serviceRecoveryRegistry);
+                serviceRecoveryRegistry,
+                flowGroupCacheManager);
 
         forwardingRulesManager.start();
         // TODO consider tests rewrite (added because of complicated access)
index bec4dee8faf194e3c5d0aa895b0003ae693c84ee..516bed903fd28ea69eaa57007439bb90864c7382 100644 (file)
@@ -17,6 +17,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
 import org.opendaylight.openflowplugin.applications.frm.recovery.OpenflowServiceRecoveryHandler;
@@ -46,6 +47,9 @@ public class NodeListenerTest extends FRMTest {
     private ServiceRecoveryRegistry serviceRecoveryRegistry;
     @Mock
     private MastershipChangeServiceManager mastershipChangeServiceManager;
+    @Mock
+    private FlowGroupCacheManager flowGroupCacheManager;
+
 
     @Before
     public void setUp() {
@@ -59,9 +63,8 @@ public class NodeListenerTest extends FRMTest {
                 getConfigurationService(),
                 reconciliationManager,
                 openflowServiceRecoveryHandler,
-                serviceRecoveryRegistry);
-
-
+                serviceRecoveryRegistry,
+                flowGroupCacheManager);
         forwardingRulesManager.start();
     }
 
index c0f02dae7cc723ade03c8cb6575425497db6ec1f..bb30caec52d521f40157bd88a850a83a6a7c3273 100644 (file)
@@ -20,6 +20,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
@@ -59,14 +60,15 @@ public class TableFeaturesListenerTest extends FRMTest {
     private ServiceRecoveryRegistry serviceRecoveryRegistry;
     @Mock
     private MastershipChangeServiceManager mastershipChangeServiceManager;
+    @Mock
+    private FlowGroupCacheManager flowGroupCacheManager;
 
     @Before
     public void setUp() {
         forwardingRulesManager = new ForwardingRulesManagerImpl(getDataBroker(), rpcProviderRegistryMock,
                 rpcProviderRegistryMock, getConfig(), mastershipChangeServiceManager, clusterSingletonService,
                 getConfigurationService(), reconciliationManager, openflowServiceRecoveryHandler,
-                serviceRecoveryRegistry);
-
+                serviceRecoveryRegistry, flowGroupCacheManager);
         forwardingRulesManager.start();
         // TODO consider tests rewrite (added because of complicated access)
         forwardingRulesManager.setDeviceMastershipManager(deviceMastershipManager);
index d1a0ede63e260891140ebdf20e3250516433d732..30bfd73598343c3b66c6baad480709bc338ec7ba 100644 (file)
@@ -9,27 +9,31 @@
 package org.opendaylight.openflowplugin.applications.southboundcli;
 
 import static java.util.Objects.requireNonNull;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.COMPLETED;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.FAILED;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.INPROGRESS;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.math.BigInteger;
 import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
 import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
 import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
 import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
@@ -43,19 +47,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateList;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateListBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateListKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -75,13 +74,16 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
     private final Long startCount = 1L;
     private final int threadPoolSize = 10;
     private final ExecutorService executor = Executors.newWorkStealingPool(threadPoolSize);
+    private volatile Map<String, ReconciliationState> reconciliationStates = new ConcurrentHashMap();
 
     public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService,
-                                     final AlarmAgent alarmAgent, final NodeListener nodeListener) {
+                                     final AlarmAgent alarmAgent, final NodeListener nodeListener,
+                                     final FlowGroupCacheManager flowGroupCacheManager) {
         this.broker = broker;
         this.frmReconciliationService = frmReconciliationService;
         this.alarmAgent = alarmAgent;
         this.nodeListener = requireNonNull(nodeListener, "NodeListener cannot be null!");
+        reconciliationStates = flowGroupCacheManager.getReconciliationStates();
     }
 
     @Override
@@ -118,12 +120,12 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
             }
             List<Uint64> inprogressNodes = new ArrayList<>();
             nodesToReconcile.parallelStream().forEach(nodeId -> {
-                Optional<ReconciliationStateList> state = getReconciliationState(nodeId);
-                if (state.isPresent() && state.get().getState().equals(INPROGRESS)) {
+                ReconciliationState state = getReconciliationState(nodeId);
+                if (state != null && state.getState().equals(STARTED)) {
                     inprogressNodes.add(Uint64.valueOf(nodeId));
                 } else {
                     alarmAgent.raiseNodeReconciliationAlarm(nodeId);
-                    LOG.info("Executing reconciliation for node {}", nodeId);
+                    LOG.info("Executing reconciliation for node {} with state ", nodeId);
                     NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
                     ReconciliationTask reconcileTask = new ReconciliationTask(new BigInteger(String.valueOf(nodeId)),
                             nodeKey);
@@ -141,17 +143,8 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
         }
     }
 
-    private Optional<ReconciliationStateList> getReconciliationState(final Long nodeId) {
-        InstanceIdentifier<ReconciliationStateList> instanceIdentifier = InstanceIdentifier
-                .builder(ReconciliationState.class).child(ReconciliationStateList.class,
-                        new ReconciliationStateListKey(new BigInteger(String.valueOf(nodeId)))).build();
-        try (ReadTransaction tx = broker.newReadOnlyTransaction()) {
-            return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get();
-
-        } catch (InterruptedException  | ExecutionException e) {
-            LOG.error("Exception while reading reconciliation state for {}", nodeId, e);
-        }
-        return Optional.empty();
+    private ReconciliationState getReconciliationState(final Long nodeId) {
+        return reconciliationStates.get(nodeId.toString());
     }
 
     private ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(String msg) {
@@ -183,7 +176,7 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
             ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
                     .setNodeId(nodeId).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
                             .child(Node.class, nodeKey).build())).build();
-            updateReconciliationState(INPROGRESS);
+            updateReconciliationState(STARTED);
             Future<RpcResult<ReconcileNodeOutput>> reconOutput = frmReconciliationService
                     .reconcileNode(reconInput);
             try {
@@ -252,20 +245,10 @@ public class ReconciliationServiceImpl implements ReconciliationService, AutoClo
             return Optional.empty();
         }
 
-        private void updateReconciliationState(State state) {
-            ReadWriteTransaction tx = broker.newReadWriteTransaction();
-            InstanceIdentifier<ReconciliationStateList> instanceIdentifier = InstanceIdentifier
-                    .builder(ReconciliationState.class).child(ReconciliationStateList.class,
-                            new ReconciliationStateListKey(nodeId)).build();
-            ReconciliationStateListBuilder stateBuilder = new ReconciliationStateListBuilder()
-                    .withKey(new ReconciliationStateListKey(nodeId))
-                    .setState(state);
-            try {
-                tx.merge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, stateBuilder.build(), true);
-                tx.commit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Exception while updating reconciliation state: {}", nodeId, e);
-            }
+
+        private void updateReconciliationState(ReconciliationState.ReconciliationStatus status) {
+            ReconciliationState state = new ReconciliationState(status, LocalDateTime.now());
+            reconciliationStates.put(nodeId.toString(),state);
         }
     }
 }
diff --git a/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/cli/GetReconciliationStateProvider.java b/applications/southbound-cli/src/main/java/org/opendaylight/openflowplugin/applications/southboundcli/cli/GetReconciliationStateProvider.java
new file mode 100644 (file)
index 0000000..2a6b17c
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.southboundcli.cli;
+
+import com.google.common.net.InetAddresses;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Type;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.apache.karaf.shell.console.OsgiCommandSupport;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.infrautils.diagstatus.ClusterMemberInfo;
+import org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Command(scope = "openflow", name = "getreconciliationstate",
+        description = "Print reconciliation state for all devices")
+public class GetReconciliationStateProvider extends OsgiCommandSupport {
+
+    @Option(name = "-d", description = "Node Id")
+    String nodeId;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GetReconciliationStateProvider.class);
+    private static final String URL_PREFIX = "http://";
+    private static final String URL_SEPARATOR = "/";
+    private static final String URL_SEPARATOR_COLON = ":";
+    private static final String HTTP_JOL_OKIA_BASE_URI = "/jolokia/exec/";
+    private static final int HTTP_TIMEOUT = 5000;
+    private final Integer httpPort;
+    private static final String JMX_OBJECT_NAME
+            = "org.opendaylight.openflowplugin.frm:type=ReconciliationState";
+    private static final String JMX_ATTRIBUTE_NAME = "acquireReconciliationStates";
+    private static final String JMX_REST_HTTP_AUTH_UNAME_PWD = "admin:admin";
+    private ReconciliationJMXServiceMBean reconciliationJMXServiceMBean;
+    private ClusterMemberInfo clusterMemberInfoProvider;
+
+
+    public GetReconciliationStateProvider(final ReconciliationJMXServiceMBean reconciliationJMXServiceMBean,
+                                          final ClusterMemberInfo clusterMemberInfoProvider,
+                                          @Nullable Integer httpPort) {
+        this.reconciliationJMXServiceMBean = reconciliationJMXServiceMBean;
+        this.clusterMemberInfoProvider = clusterMemberInfoProvider;
+        this.httpPort = httpPort;
+    }
+
+    @Override
+    protected Object doExecute() throws Exception {
+        List<String> result = new ArrayList<>();
+        Map<String, String> reconciliationStates  = getClusterwideReconcilitionStates();
+        if (nodeId == null) {
+            if (!reconciliationStates.isEmpty()) {
+                reconciliationStates.forEach((datapathId, reconciliationState) -> {
+                    String status = String.format("%-17s %-50s", datapathId, reconciliationState);
+                    result.add(status);
+                });
+                printReconciliationStates(result);
+            } else {
+                session.getConsole().println("Reconciliation data not available");
+            }
+        }
+        else {
+            String reconciliationState = getReconciliationStateForNode();
+            if (reconciliationState != null) {
+                String status = String.format("%-17s %-50s", nodeId, reconciliationState);
+                result.add(status);
+                printReconciliationStates(result);
+            } else {
+                session.getConsole().println("Reconciliation data not available for the specified node");
+            }
+        }
+        return null;
+    }
+
+    private String getReconciliationStateForNode() {
+        //first checking reconciliation state locally
+        String reconciliationState = reconciliationJMXServiceMBean.acquireReconciliationStates().get(nodeId);
+        if (reconciliationState == null) {
+            //checking reconciliation state in the cluster
+            reconciliationState = getClusterwideReconcilitionStates().get(nodeId);
+        }
+        return reconciliationState;
+    }
+
+    private void printReconciliationStates(List<String> result) {
+        session.getConsole().println(getHeaderOutput());
+        session.getConsole().println(getLineSeparator());
+        result.stream().forEach(p -> session.getConsole().println(p));
+    }
+
+    private String getHeaderOutput() {
+        String header = String.format("%-17s %-25s %-25s", "DatapathId", "Reconciliation Status",
+                "Reconciliation Time");
+        return header;
+    }
+
+    private String getLineSeparator() {
+        return "-------------------------------------------------------------------";
+    }
+
+    @SuppressWarnings("IllegalCatch")
+    private Map<String,String> getClusterwideReconcilitionStates() {
+        Map<String,String>  clusterwideReconcStates = new HashMap<>();
+        List<String> clusterIPAddresses = clusterMemberInfoProvider.getClusterMembers().stream()
+                .map(s -> s.getHostAddress()).collect(Collectors.toList());
+        LOG.debug("The ip address of nodes in the cluster : {}", clusterIPAddresses);
+        if (!clusterIPAddresses.isEmpty()) {
+            String selfAddress = clusterMemberInfoProvider.getSelfAddress() != null
+                    ? clusterMemberInfoProvider.getSelfAddress().getHostAddress() : ("localhost");
+            LOG.trace("The ip address of local node is {}", selfAddress);
+            for (String memberAddress : clusterIPAddresses) {
+                try {
+                    if (memberAddress.equals(selfAddress)) {
+                        clusterwideReconcStates.putAll(getLocalStatusSummary());
+                    } else {
+                        clusterwideReconcStates.putAll(getRemoteReconciliationStates(memberAddress));
+                    }
+                } catch (Exception e) {
+                    LOG.error("Exception while reaching Host {}", memberAddress, e);
+                }
+            }
+        } else {
+            LOG.info("Could not obtain cluster members or the cluster-command is being executed locally\n");
+        }
+        return clusterwideReconcStates;
+    }
+
+    @SuppressWarnings("IllegalCatch")
+    private Map<String, String> getRemoteReconciliationStates(String ipAddress) {
+        Map<String, String> jmxReconciliationStates = new HashMap<>();
+        try {
+            String getReconcilationRemoteResponse = invokeRemoteRestOperation(ipAddress);
+            if (getReconcilationRemoteResponse != null) {
+                JsonElement rootObj = new JsonParser().parse(getReconcilationRemoteResponse);
+                String remoteJMXOperationResult = rootObj.getAsJsonObject().get("value").toString();
+                Type type = new TypeToken<HashMap<String, String>>() {}.getType();
+                jmxReconciliationStates.putAll(new Gson().fromJson(remoteJMXOperationResult, type));
+            }
+        } catch (Exception e) {
+            LOG.error("Exception during reconciliation states from device with ip address {}", ipAddress, e);
+        }
+        return jmxReconciliationStates;
+    }
+
+    private Map<String,String> getLocalStatusSummary() {
+        return reconciliationJMXServiceMBean.acquireReconciliationStates();
+    }
+
+    @SuppressFBWarnings("DM_DEFAULT_ENCODING")
+    private String invokeRemoteRestOperation(String ipAddress) throws Exception {
+        String restUrl = buildRemoteReconcilationUrl(ipAddress);
+        LOG.info("invokeRemoteReconcilationState() REST URL: {}", restUrl);
+        String authString = JMX_REST_HTTP_AUTH_UNAME_PWD;
+        byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
+        String authStringEnc = new String(authEncBytes);
+        HttpRequest request = HttpRequest.newBuilder(URI.create(restUrl))
+                                .timeout(Duration.ofMillis(HTTP_TIMEOUT))
+                                .header("Authorization","Basic " + authStringEnc)
+                                .header("Accept","application/json")
+                                .GET()
+                                .build();
+
+        LOG.debug("sending http request for accessing remote reconcilation");
+        HttpResponse<String> response = HttpClient.newBuilder()
+                                .connectTimeout(request.timeout().get().plusMillis(1000))
+                                .build()
+                                .send(request, HttpResponse.BodyHandlers.ofString());
+        // Response code for success should be 200
+        Integer httpResponseCode = response.statusCode();
+        LOG.debug("http response received for remote reconcilation {}", httpResponseCode);
+        String respStr = response.body();
+        if (httpResponseCode > 299) {
+            LOG.error("Non-200 http response code received {} for URL {}", httpResponseCode, restUrl);
+            if (respStr == null || respStr.isEmpty()) {
+                return "Service Status Retrieval failed. HTTP Response Code : " + httpResponseCode + "\n";
+            }
+        }
+        LOG.trace("HTTP Response is - {} for URL {}", respStr, restUrl);
+
+        return respStr;
+    }
+
+
+    String buildRemoteReconcilationUrl(String host) {
+        String targetHostAsString;
+        InetAddress hostInetAddress = InetAddresses.forString(host);
+        if (hostInetAddress instanceof Inet6Address) {
+            targetHostAsString = '[' + hostInetAddress.getHostAddress() + ']';
+        } else {
+            targetHostAsString = hostInetAddress.getHostAddress();
+        }
+        return new StringBuilder().append(URL_PREFIX).append(targetHostAsString).append(URL_SEPARATOR_COLON)
+                .append(httpPort).append(HTTP_JOL_OKIA_BASE_URI).append(JMX_OBJECT_NAME)
+                .append(URL_SEPARATOR).append(JMX_ATTRIBUTE_NAME).toString();
+    }
+}
\ No newline at end of file
diff --git a/applications/southbound-cli/src/main/resources/OSGI-INF/blueprint/commands.xml b/applications/southbound-cli/src/main/resources/OSGI-INF/blueprint/commands.xml
new file mode 100644 (file)
index 0000000..c8d1c4e
--- /dev/null
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+
+    <reference id="reconciliationJMXServiceMBean"
+               interface="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean"
+               availability="optional"/>
+    <cm:property-placeholder persistent-id="org.ops4j.pax.web" update-strategy="none">
+        <cm:default-properties>
+            <cm:property name="org.osgi.service.http.port" value="8181"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
+    <reference id="clusterMemberInfoProvider"
+               interface="org.opendaylight.infrautils.diagstatus.ClusterMemberInfo"
+               availability="optional"/>
+
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.0.0">
+        <command name="openflow/getallnodes">
+            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetAllNodesCommandProvider">
+                <property name="dataBroker" ref="dataBroker" />
+                <property name="nodeListener" ref="nodeListener"/>
+            </action>
+        </command>
+        <command name="openflow/shownode">
+            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ShowNodeCommandProvider">
+                <property name="dataBroker" ref="dataBroker" />
+            </action>
+        </command>
+        <command name="openflow/reconcile">
+            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.Reconciliation">
+                <property name="reconciliationService" ref="reconciliationService"/>
+            </action>
+        </command>
+        <command name="openflow/getreconciliationcount">
+            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ReconciliationCount">
+                <property name="dataBroker" ref="dataBroker"/>
+            </action>
+        </command>
+        <command name="openflow/getreconciliationstate">
+            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetReconciliationStateProvider">
+                <argument ref ="reconciliationJMXServiceMBean"/>
+                <argument ref="clusterMemberInfoProvider"/>
+                <argument value="${org.osgi.service.http.port}"/>
+            </action>
+        </command>
+    </command-bundle>
+
+</blueprint>
\ No newline at end of file
index 635c5319e6948cd8a7e6450f7f87e48989232d48..b4bce7b42abd16f16bd0bb0c671ff6cdbc6624b1 100644 (file)
@@ -6,6 +6,7 @@
 
     <reference id="dataBroker"
                interface="org.opendaylight.mdsal.binding.api.DataBroker"/>
+    <reference id="flowGroupCacheManager" interface="org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager"/>
 
     <odl:rpc-service id="frmReconciliationService"
              interface="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService"/>
     <bean id="alarmAgent"
           class="org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent" init-method="start">
     </bean>
+    <bean id="nodeListener"
+          class="org.opendaylight.openflowplugin.applications.southboundcli.NodeListener"
+          init-method="start"
+          destroy-method="close">
+        <argument ref="dataBroker"/>
+    </bean>
     <bean id="reconciliationService"
           class="org.opendaylight.openflowplugin.applications.southboundcli.ReconciliationServiceImpl"
           destroy-method="close">
         <argument ref="frmReconciliationService"/>
         <argument ref="alarmAgent"/>
         <argument ref="nodeListener"/>
-    </bean>
-    <bean id="nodeListener"
-          class="org.opendaylight.openflowplugin.applications.southboundcli.NodeListener"
-          init-method="start"
-          destroy-method="close">
-        <argument ref="dataBroker"/>
+        <argument ref="flowGroupCacheManager"/>
     </bean>
 
     <odl:rpc-implementation ref="reconciliationService"/>
 
-    <!--To assert references for CLI implementations-->
-    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.0.0">
-        <command name="openflow/getallnodes">
-            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetAllNodesCommandProvider">
-                <property name="dataBroker" ref="dataBroker" />
-                <property name="nodeListener" ref="nodeListener"/>
-            </action>
-        </command>
-        <command name="openflow/shownode">
-            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ShowNodeCommandProvider">
-                <property name="dataBroker" ref="dataBroker" />
-            </action>
-        </command>
-        <command name="openflow/reconcile">
-            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.Reconciliation">
-                <property name="reconciliationService" ref="reconciliationService"/>
-            </action>
-        </command>
-        <command name="openflow/getreconciliationcount">
-            <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ReconciliationCount">
-                <property name="dataBroker" ref="dataBroker"/>
-            </action>
-        </command>
-    </command-bundle>
-
 </blueprint>
index 616e0a73a50dfce889030b3ca751939ab1baa56e..335f4f918efc45219a0b7e3fda329b1f7cd9f716 100644 (file)
@@ -35,29 +35,6 @@ module reconciliation {
         }
     }
 
-    container reconciliation-state {
-         description "Reconciliation state for the given openflow nodes";
-         config false;
-         list reconciliation-state-list {
-              key node-id;
-              leaf node-id {
-                   type uint64;
-              }
-              uses node-reconcile-state;
-         }
-    }
-
-    grouping node-reconcile-state {
-         leaf state {
-              description "Current state of the node reconciliation";
-              type enumeration {
-                   enum IN_PROGRESS;
-                   enum COMPLETED;
-                   enum FAILED;
-              }
-         }
-    }
-
     rpc reconcile {
         description "Request the reconciliation for given device or set of devices to the controller.";
         input {
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/FlowGroupCacheManager.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/FlowGroupCacheManager.java
new file mode 100644 (file)
index 0000000..5a49578
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow;
+
+import java.util.Map;
+
+public interface FlowGroupCacheManager {
+
+    Map<String, ReconciliationState> getReconciliationStates();
+}
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/ReconciliationState.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/ReconciliationState.java
new file mode 100644 (file)
index 0000000..37920c1
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow;
+
+import java.time.LocalDateTime;
+
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconciliationState {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationState.class);
+
+    public enum ReconciliationStatus {
+        STARTED,
+        COMPLETED,
+        FAILED
+    }
+
+    private ReconciliationStatus status;
+    private LocalDateTime time;
+
+    public ReconciliationState(@Nullable ReconciliationStatus status, LocalDateTime time) {
+        this.status = status;
+        this.time = time;
+    }
+
+    public ReconciliationStatus getState() {
+        return status;
+    }
+
+    public void setState(ReconciliationStatus staTus, LocalDateTime timing) {
+        this.status = staTus;
+        this.time = timing;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%-25s %-25s", this.status, this.time);
+    }
+}
+
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/cache/FlowGroupCacheManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/cache/FlowGroupCacheManagerImpl.java
new file mode 100644 (file)
index 0000000..5d2323e
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.services.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import org.apache.aries.blueprint.annotation.service.Service;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
+
+@Singleton
+@Service(classes = FlowGroupCacheManager.class)
+public class FlowGroupCacheManagerImpl implements FlowGroupCacheManager {
+
+    private Map<String, ReconciliationState> reconciliationStates = new ConcurrentHashMap<>();
+
+    @Override
+    public Map<String, ReconciliationState> getReconciliationStates() {
+        return reconciliationStates;
+    }
+}
\ No newline at end of file