*/
public interface FlowNodeReconciliation extends ReconciliationNotificationListener, AutoCloseable {
ListenableFuture<Boolean> reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode);
+
+ void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode);
}
--- /dev/null
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.lang.management.ManagementFactory;
+import javax.inject.Inject;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ReconciliationJMXAgent {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationJMXAgent.class);
+ private static final String OF_RECONC_BEANNAME
+ = "org.opendaylight.openflowplugin.frm:type=ReconciliationState";
+ private MBeanServer mbs = null;
+ private ObjectName objectName = null;
+
+ @Inject
+ public ReconciliationJMXAgent(final ReconciliationJMXService reconciliationJMXService) {
+ mbs = ManagementFactory.getPlatformMBeanServer();
+ try {
+ objectName = new ObjectName(OF_RECONC_BEANNAME);
+ registerReconciliationMbean(reconciliationJMXService);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("ObjectName instance creation failed for Mbean {} : ", OF_RECONC_BEANNAME, e);
+ }
+ }
+
+ public void registerReconciliationMbean(ReconciliationJMXService reconciliationJMXService) {
+ try {
+ // Uniquely identify the MBeans and register them with the platform MBeanServer
+ if (!mbs.isRegistered(objectName)) {
+ mbs.registerMBean(reconciliationJMXService, objectName);
+ LOG.debug("Registered Mbean {} successfully", OF_RECONC_BEANNAME);
+ }
+ } catch (MBeanRegistrationException | InstanceAlreadyExistsException | NotCompliantMBeanException e) {
+ LOG.error("Registeration failed for Mbean {} : ", OF_RECONC_BEANNAME , e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.inject.Inject;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+
+public class ReconciliationJMXService implements ReconciliationJMXServiceMBean {
+ private FlowGroupCacheManager flowGroupCacheManager;
+
+ @Inject
+ public ReconciliationJMXService(final FlowGroupCacheManager floGroupCacheManager) {
+ this.flowGroupCacheManager = floGroupCacheManager;
+ }
+
+ @Override
+ public Map<String, String> acquireReconciliationStates() {
+ Map<String, String> reconciliationStatesMap = new HashMap<>();
+ flowGroupCacheManager.getReconciliationStates().forEach((datapathId, reconciliationState) ->
+ reconciliationStatesMap.put(datapathId, reconciliationState.toString()));
+ return reconciliationStatesMap;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frm;
+
+import java.util.Map;
+
+public interface ReconciliationJMXServiceMBean {
+ /* The MapKey is datapathId
+ The MapValue is ReconciliationStates of the respective nodes.
+ */
+ Map<String, String> acquireReconciliationStates();
+}
\ No newline at end of file
if (activeNodes.contains(nodeIdent)) {
Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
set.remove(nodeIdent);
+ reconcliationAgent.flowNodeDisconnected(nodeIdent);
activeNodes = Collections.unmodifiableSet(set);
setNodeOperationalStatus(nodeIdent, false);
}
*/
package org.opendaylight.openflowplugin.applications.frm.impl;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.math.BigInteger;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
private static final AtomicLong BUNDLE_ID = new AtomicLong();
private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+ private Map<String, ReconciliationState> reconciliationStates;
public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db,
- final String serviceName, final int priority, final ResultState resultState) {
+ final String serviceName, final int priority, final ResultState resultState,
+ final FlowGroupCacheManager flowGroupCacheManager) {
this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
this.serviceName = serviceName;
this.resultState = resultState;
salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),
"salBundleService can not be null!");
+ reconciliationStates = flowGroupCacheManager.getReconciliationStates();
}
@Override
}
}
+ @Override
+ public void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode) {
+ String node = disconnectedNode.firstKeyOf(Node.class).getId().getValue();
+ BigInteger dpnId = getDpnIdFromNodeName(node);
+ reconciliationStates.remove(dpnId.toString());
+ }
+
private class BundleBasedReconciliationTask implements Callable<Boolean> {
final InstanceIdentifier<FlowCapableNode> nodeIdentity;
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
}
- try {
- if (flowNode.isPresent()) {
- LOG.debug("FlowNode present for Datapath ID {}", dpnId);
- OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
- final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
-
- final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
-
- final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTOPENREQUEST).build();
-
- final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
-
- final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
- .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setMessages(createMessages(nodeRef)).build();
-
- LOG.debug("Closing openflow bundle for device {}", dpnId);
- /* Close previously opened bundle on the openflow switch if any */
- ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
- = salBundleService.controlBundle(closeBundleInput);
-
- /* Open a new bundle on the switch */
- ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
+
+ if (flowNode.isPresent()) {
+ ReconciliationState reconciliationState = new ReconciliationState(
+ STARTED, LocalDateTime.now());
+ //put the dpn info into the map
+ reconciliationStates.put(dpnId.toString(), reconciliationState);
+ LOG.debug("FlowNode present for Datapath ID {}", dpnId);
+ OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
+ final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+ final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
+ final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+
+ final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+
+ final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
+ .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setMessages(createMessages(nodeRef)).build();
+
+ LOG.debug("Closing openflow bundle for device {}", dpnId);
+ /* Close previously opened bundle on the openflow switch if any */
+ ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
+ = salBundleService.controlBundle(closeBundleInput);
+
+ /* Open a new bundle on the switch */
+ ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
Futures.transformAsync(closeBundle,
rpcResult -> salBundleService.controlBundle(openBundleInput),
service);
/* Push groups and flows via bundle add messages */
- ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
- = Futures.transformAsync(openBundle, rpcResult -> {
- if (rpcResult.isSuccessful()) {
- return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
- }
- return Futures.immediateFuture(null);
- }, service);
-
- /* Push flows and groups via bundle add messages */
- Optional<FlowCapableNode> finalFlowNode = flowNode;
- ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
- = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
- if (rpcResult.isSuccessful()) {
- LOG.debug("Adding delete all flow/group message is successful for device {}",dpnId);
- return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
- nodeIdentity));
- }
- return Futures.immediateFuture(null);
- }, service);
+ ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
+ = Futures.transformAsync(openBundle, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ /* Push flows and groups via bundle add messages */
+ Optional<FlowCapableNode> finalFlowNode = flowNode;
+ ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+ = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
+ return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+ nodeIdentity));
+ }
+ return Futures.immediateFuture(null);
+ }, service);
/* Commit the bundle on the openflow switch */
- ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
- addbundlesFuture, rpcResult -> {
+ ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
+ = Futures.transformAsync(addbundlesFuture, rpcResult -> {
LOG.debug("Adding bundle messages completed for device {}", dpnId);
return salBundleService.controlBundle(commitBundleInput);
}, service);
- /* Bundles not supported for meters */
- List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
- : Collections.emptyList();
- Futures.transformAsync(commitBundleFuture,
- rpcResult -> {
- if (rpcResult.isSuccessful()) {
- for (Meter meter : meters) {
- final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
- .child(Meter.class, meter.key());
- provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
- }
+ /* Bundles not supported for meters */
+ List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
+ : Collections.emptyList();
+ Futures.transformAsync(commitBundleFuture,
+ rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ for (Meter meter : meters) {
+ final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
+ .child(Meter.class, meter.key());
+ provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
}
- return Futures.immediateFuture(null);
- }, service);
- try {
- RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
- if (bundleFuture != null && bundleFuture.isSuccessful()) {
- LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
- OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
- return true;
- } else {
- LOG.error("commit bundle failed for device {} with error {}", dpnId,
- commitBundleFuture.get().getErrors());
- return false;
}
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
+ return Futures.immediateFuture(null);
+ }, service);
+ try {
+ RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+ if (bundleFuture != null && bundleFuture.isSuccessful()) {
+ reconciliationState.setState(COMPLETED, LocalDateTime.now());
+ LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+ OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
+ return true;
+ } else {
+ reconciliationState.setState(FAILED, LocalDateTime.now());
+ LOG.error("commit bundle failed for device {} with error {}", dpnId,
+ commitBundleFuture.get().getErrors());
return false;
}
+ } catch (InterruptedException | ExecutionException e) {
+ reconciliationState.setState(FAILED, LocalDateTime.now());
+ LOG.error("commit bundle failed for device {} with error ", dpnId, e);
+ return false;
+ } finally {
+ service.shutdown();
}
- LOG.error("FlowNode not present for Datapath ID {}", dpnId);
- return false;
- } finally {
- service.shutdown();
}
+ LOG.error("FlowNode not present for Datapath ID {}", dpnId);
+ return false;
}
}
/* Tables - have to be pushed before groups */
// CHECK if while pushing the update, updateTableInput can be null to emulate a
// table add
+ ReconciliationState reconciliationState = new ReconciliationState(
+ STARTED, LocalDateTime.now());
+ //put the dpn info into the map
+ reconciliationStates.put(dpnId.toString(), reconciliationState);
+ LOG.debug("Triggering reconciliation for node {} with state: {}", dpnId, STARTED);
List<TableFeatures> tableList = flowNode.get().getTableFeatures() != null
? flowNode.get().getTableFeatures()
: Collections.<TableFeatures>emptyList();
provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity);
}
}
+ reconciliationState.setState(COMPLETED, LocalDateTime.now());
OF_EVENT_LOG.debug("Reconciliation Finish, Node: {}, flow count: {}", dpnId, flowCount);
}
return true;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
private boolean isBundleBasedReconciliationEnabled;
private final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler;
private final ServiceRecoveryRegistry serviceRecoveryRegistry;
+ private final FlowGroupCacheManager flowGroupCacheManager;
@Inject
public ForwardingRulesManagerImpl(@Reference final DataBroker dataBroker,
@Reference final ConfigurationService configurationService,
@Reference final ReconciliationManager reconciliationManager,
final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler,
- @Reference final ServiceRecoveryRegistry serviceRecoveryRegistry) {
+ @Reference final ServiceRecoveryRegistry serviceRecoveryRegistry,
+ @Reference final FlowGroupCacheManager flowGroupCacheManager) {
disableReconciliation = config.isDisableReconciliation();
staleMarkingEnabled = config.isStaleMarkingEnabled();
reconciliationRetryCount = config.getReconciliationRetryCount().toJava();
this.reconciliationManager = reconciliationManager;
this.rpcProviderService = rpcProviderService;
this.mastershipChangeServiceManager = mastershipChangeServiceManager;
+ this.flowGroupCacheManager = flowGroupCacheManager;
Preconditions.checkArgument(rpcRegistry != null, "RpcProviderRegistry can not be null !");
public void start() {
nodeConfigurator = new NodeConfiguratorImpl();
this.devicesGroupRegistry = new DevicesGroupRegistry();
-
this.nodeListener = new FlowNodeReconciliationImpl(this, dataService, SERVICE_NAME, FRM_RECONCILIATION_PRIORITY,
- ResultState.DONOTHING);
+ ResultState.DONOTHING, flowGroupCacheManager);
if (this.isReconciliationDisabled()) {
LOG.debug("Reconciliation is disabled by user");
} else {
<odl:clustered-app-config id="forwardingRulesManagerConfig"
binding-class="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig"/>
+ <bean id="reconciliationJMXService"
+ class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXService">
+ <argument ref="flowGroupCacheManager"/>
+ </bean>
+ <bean id="reconciliationJMXAgent"
+ class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXAgent">
+ <argument ref="reconciliationJMXService"/>
+ </bean>
+ <bean id="reconciliationJMXServiceMBean"
+ class="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXService">
+ <argument ref="flowGroupCacheManager"/>
+ </bean>
+
+ <service ref="reconciliationJMXServiceMBean" interface="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean"/>
+
</blueprint>
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
private ServiceRecoveryRegistry serviceRecoveryRegistry;
@Mock
private MastershipChangeServiceManager mastershipChangeServiceManager;
+ @Mock
+ private FlowGroupCacheManager flowGroupCacheManager;
@Before
public void setUp() {
forwardingRulesManager = new ForwardingRulesManagerImpl(getDataBroker(), rpcProviderRegistryMock,
rpcProviderRegistryMock, getConfig(), mastershipChangeServiceManager, clusterSingletonService,
getConfigurationService(), reconciliationManager, openflowServiceRecoveryHandler,
- serviceRecoveryRegistry);
-
+ serviceRecoveryRegistry, flowGroupCacheManager);
forwardingRulesManager.start();
// TODO consider tests rewrite (added because of complicated access)
forwardingRulesManager.setDeviceMastershipManager(deviceMastershipManager);
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
private ServiceRecoveryRegistry serviceRecoveryRegistry;
@Mock
private MastershipChangeServiceManager mastershipChangeServiceManager;
+ @Mock
+ private FlowGroupCacheManager flowGroupCacheManager;
+
@Before
public void setUp() {
getConfigurationService(),
reconciliationManager,
openflowServiceRecoveryHandler,
- serviceRecoveryRegistry);
+ serviceRecoveryRegistry,
+ flowGroupCacheManager);
forwardingRulesManager.start();
// TODO consider tests rewrite (added because of complicated access)
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
private ServiceRecoveryRegistry serviceRecoveryRegistry;
@Mock
private MastershipChangeServiceManager mastershipChangeServiceManager;
+ @Mock
+ private FlowGroupCacheManager flowGroupCacheManager;
+
@Before
public void setUp() {
getConfigurationService(),
reconciliationManager,
openflowServiceRecoveryHandler,
- serviceRecoveryRegistry);
+ serviceRecoveryRegistry,
+ flowGroupCacheManager);
forwardingRulesManager.start();
// TODO consider tests rewrite (added because of complicated access)
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
import org.opendaylight.openflowplugin.applications.frm.recovery.OpenflowServiceRecoveryHandler;
private ServiceRecoveryRegistry serviceRecoveryRegistry;
@Mock
private MastershipChangeServiceManager mastershipChangeServiceManager;
+ @Mock
+ private FlowGroupCacheManager flowGroupCacheManager;
+
@Before
public void setUp() {
getConfigurationService(),
reconciliationManager,
openflowServiceRecoveryHandler,
- serviceRecoveryRegistry);
-
-
+ serviceRecoveryRegistry,
+ flowGroupCacheManager);
forwardingRulesManager.start();
}
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
import org.opendaylight.openflowplugin.applications.frm.impl.DeviceMastershipManager;
import org.opendaylight.openflowplugin.applications.frm.impl.ForwardingRulesManagerImpl;
private ServiceRecoveryRegistry serviceRecoveryRegistry;
@Mock
private MastershipChangeServiceManager mastershipChangeServiceManager;
+ @Mock
+ private FlowGroupCacheManager flowGroupCacheManager;
@Before
public void setUp() {
forwardingRulesManager = new ForwardingRulesManagerImpl(getDataBroker(), rpcProviderRegistryMock,
rpcProviderRegistryMock, getConfig(), mastershipChangeServiceManager, clusterSingletonService,
getConfigurationService(), reconciliationManager, openflowServiceRecoveryHandler,
- serviceRecoveryRegistry);
-
+ serviceRecoveryRegistry, flowGroupCacheManager);
forwardingRulesManager.start();
// TODO consider tests rewrite (added because of complicated access)
forwardingRulesManager.setDeviceMastershipManager(deviceMastershipManager);
package org.opendaylight.openflowplugin.applications.southboundcli;
import static java.util.Objects.requireNonNull;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.COMPLETED;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.FAILED;
-import static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State.INPROGRESS;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.COMPLETED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.FAILED;
+import static org.opendaylight.openflowplugin.api.openflow.ReconciliationState.ReconciliationStatus.STARTED;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
import org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent;
import org.opendaylight.openflowplugin.applications.southboundcli.util.OFNode;
import org.opendaylight.openflowplugin.applications.southboundcli.util.ShellUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.NodeReconcileState.State;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconcileOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationCounter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.ReconciliationState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.counter.ReconcileCounterKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateList;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateListBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.reconciliation.service.rev180227.reconciliation.state.ReconciliationStateListKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
private final Long startCount = 1L;
private final int threadPoolSize = 10;
private final ExecutorService executor = Executors.newWorkStealingPool(threadPoolSize);
+ private volatile Map<String, ReconciliationState> reconciliationStates = new ConcurrentHashMap();
public ReconciliationServiceImpl(final DataBroker broker, final FrmReconciliationService frmReconciliationService,
- final AlarmAgent alarmAgent, final NodeListener nodeListener) {
+ final AlarmAgent alarmAgent, final NodeListener nodeListener,
+ final FlowGroupCacheManager flowGroupCacheManager) {
this.broker = broker;
this.frmReconciliationService = frmReconciliationService;
this.alarmAgent = alarmAgent;
this.nodeListener = requireNonNull(nodeListener, "NodeListener cannot be null!");
+ reconciliationStates = flowGroupCacheManager.getReconciliationStates();
}
@Override
}
List<Uint64> inprogressNodes = new ArrayList<>();
nodesToReconcile.parallelStream().forEach(nodeId -> {
- Optional<ReconciliationStateList> state = getReconciliationState(nodeId);
- if (state.isPresent() && state.get().getState().equals(INPROGRESS)) {
+ ReconciliationState state = getReconciliationState(nodeId);
+ if (state != null && state.getState().equals(STARTED)) {
inprogressNodes.add(Uint64.valueOf(nodeId));
} else {
alarmAgent.raiseNodeReconciliationAlarm(nodeId);
- LOG.info("Executing reconciliation for node {}", nodeId);
+ LOG.info("Executing reconciliation for node {} with state ", nodeId);
NodeKey nodeKey = new NodeKey(new NodeId("openflow:" + nodeId));
ReconciliationTask reconcileTask = new ReconciliationTask(new BigInteger(String.valueOf(nodeId)),
nodeKey);
}
}
- private Optional<ReconciliationStateList> getReconciliationState(final Long nodeId) {
- InstanceIdentifier<ReconciliationStateList> instanceIdentifier = InstanceIdentifier
- .builder(ReconciliationState.class).child(ReconciliationStateList.class,
- new ReconciliationStateListKey(new BigInteger(String.valueOf(nodeId)))).build();
- try (ReadTransaction tx = broker.newReadOnlyTransaction()) {
- return tx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier).get();
-
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Exception while reading reconciliation state for {}", nodeId, e);
- }
- return Optional.empty();
+ private ReconciliationState getReconciliationState(final Long nodeId) {
+ return reconciliationStates.get(nodeId.toString());
}
private ListenableFuture<RpcResult<ReconcileOutput>> buildErrorResponse(String msg) {
ReconcileNodeInput reconInput = new ReconcileNodeInputBuilder()
.setNodeId(nodeId).setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
.child(Node.class, nodeKey).build())).build();
- updateReconciliationState(INPROGRESS);
+ updateReconciliationState(STARTED);
Future<RpcResult<ReconcileNodeOutput>> reconOutput = frmReconciliationService
.reconcileNode(reconInput);
try {
return Optional.empty();
}
- private void updateReconciliationState(State state) {
- ReadWriteTransaction tx = broker.newReadWriteTransaction();
- InstanceIdentifier<ReconciliationStateList> instanceIdentifier = InstanceIdentifier
- .builder(ReconciliationState.class).child(ReconciliationStateList.class,
- new ReconciliationStateListKey(nodeId)).build();
- ReconciliationStateListBuilder stateBuilder = new ReconciliationStateListBuilder()
- .withKey(new ReconciliationStateListKey(nodeId))
- .setState(state);
- try {
- tx.merge(LogicalDatastoreType.OPERATIONAL, instanceIdentifier, stateBuilder.build(), true);
- tx.commit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Exception while updating reconciliation state: {}", nodeId, e);
- }
+
+ private void updateReconciliationState(ReconciliationState.ReconciliationStatus status) {
+ ReconciliationState state = new ReconciliationState(status, LocalDateTime.now());
+ reconciliationStates.put(nodeId.toString(),state);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.southboundcli.cli;
+
+import com.google.common.net.InetAddresses;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Type;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.apache.karaf.shell.console.OsgiCommandSupport;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.infrautils.diagstatus.ClusterMemberInfo;
+import org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Command(scope = "openflow", name = "getreconciliationstate",
+ description = "Print reconciliation state for all devices")
+public class GetReconciliationStateProvider extends OsgiCommandSupport {
+
+ @Option(name = "-d", description = "Node Id")
+ String nodeId;
+
+ private static final Logger LOG = LoggerFactory.getLogger(GetReconciliationStateProvider.class);
+ private static final String URL_PREFIX = "http://";
+ private static final String URL_SEPARATOR = "/";
+ private static final String URL_SEPARATOR_COLON = ":";
+ private static final String HTTP_JOL_OKIA_BASE_URI = "/jolokia/exec/";
+ private static final int HTTP_TIMEOUT = 5000;
+ private final Integer httpPort;
+ private static final String JMX_OBJECT_NAME
+ = "org.opendaylight.openflowplugin.frm:type=ReconciliationState";
+ private static final String JMX_ATTRIBUTE_NAME = "acquireReconciliationStates";
+ private static final String JMX_REST_HTTP_AUTH_UNAME_PWD = "admin:admin";
+ private ReconciliationJMXServiceMBean reconciliationJMXServiceMBean;
+ private ClusterMemberInfo clusterMemberInfoProvider;
+
+
+ public GetReconciliationStateProvider(final ReconciliationJMXServiceMBean reconciliationJMXServiceMBean,
+ final ClusterMemberInfo clusterMemberInfoProvider,
+ @Nullable Integer httpPort) {
+ this.reconciliationJMXServiceMBean = reconciliationJMXServiceMBean;
+ this.clusterMemberInfoProvider = clusterMemberInfoProvider;
+ this.httpPort = httpPort;
+ }
+
+ @Override
+ protected Object doExecute() throws Exception {
+ List<String> result = new ArrayList<>();
+ Map<String, String> reconciliationStates = getClusterwideReconcilitionStates();
+ if (nodeId == null) {
+ if (!reconciliationStates.isEmpty()) {
+ reconciliationStates.forEach((datapathId, reconciliationState) -> {
+ String status = String.format("%-17s %-50s", datapathId, reconciliationState);
+ result.add(status);
+ });
+ printReconciliationStates(result);
+ } else {
+ session.getConsole().println("Reconciliation data not available");
+ }
+ }
+ else {
+ String reconciliationState = getReconciliationStateForNode();
+ if (reconciliationState != null) {
+ String status = String.format("%-17s %-50s", nodeId, reconciliationState);
+ result.add(status);
+ printReconciliationStates(result);
+ } else {
+ session.getConsole().println("Reconciliation data not available for the specified node");
+ }
+ }
+ return null;
+ }
+
+ private String getReconciliationStateForNode() {
+ //first checking reconciliation state locally
+ String reconciliationState = reconciliationJMXServiceMBean.acquireReconciliationStates().get(nodeId);
+ if (reconciliationState == null) {
+ //checking reconciliation state in the cluster
+ reconciliationState = getClusterwideReconcilitionStates().get(nodeId);
+ }
+ return reconciliationState;
+ }
+
+ private void printReconciliationStates(List<String> result) {
+ session.getConsole().println(getHeaderOutput());
+ session.getConsole().println(getLineSeparator());
+ result.stream().forEach(p -> session.getConsole().println(p));
+ }
+
+ private String getHeaderOutput() {
+ String header = String.format("%-17s %-25s %-25s", "DatapathId", "Reconciliation Status",
+ "Reconciliation Time");
+ return header;
+ }
+
+ private String getLineSeparator() {
+ return "-------------------------------------------------------------------";
+ }
+
+ @SuppressWarnings("IllegalCatch")
+ private Map<String,String> getClusterwideReconcilitionStates() {
+ Map<String,String> clusterwideReconcStates = new HashMap<>();
+ List<String> clusterIPAddresses = clusterMemberInfoProvider.getClusterMembers().stream()
+ .map(s -> s.getHostAddress()).collect(Collectors.toList());
+ LOG.debug("The ip address of nodes in the cluster : {}", clusterIPAddresses);
+ if (!clusterIPAddresses.isEmpty()) {
+ String selfAddress = clusterMemberInfoProvider.getSelfAddress() != null
+ ? clusterMemberInfoProvider.getSelfAddress().getHostAddress() : ("localhost");
+ LOG.trace("The ip address of local node is {}", selfAddress);
+ for (String memberAddress : clusterIPAddresses) {
+ try {
+ if (memberAddress.equals(selfAddress)) {
+ clusterwideReconcStates.putAll(getLocalStatusSummary());
+ } else {
+ clusterwideReconcStates.putAll(getRemoteReconciliationStates(memberAddress));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reaching Host {}", memberAddress, e);
+ }
+ }
+ } else {
+ LOG.info("Could not obtain cluster members or the cluster-command is being executed locally\n");
+ }
+ return clusterwideReconcStates;
+ }
+
+ @SuppressWarnings("IllegalCatch")
+ private Map<String, String> getRemoteReconciliationStates(String ipAddress) {
+ Map<String, String> jmxReconciliationStates = new HashMap<>();
+ try {
+ String getReconcilationRemoteResponse = invokeRemoteRestOperation(ipAddress);
+ if (getReconcilationRemoteResponse != null) {
+ JsonElement rootObj = new JsonParser().parse(getReconcilationRemoteResponse);
+ String remoteJMXOperationResult = rootObj.getAsJsonObject().get("value").toString();
+ Type type = new TypeToken<HashMap<String, String>>() {}.getType();
+ jmxReconciliationStates.putAll(new Gson().fromJson(remoteJMXOperationResult, type));
+ }
+ } catch (Exception e) {
+ LOG.error("Exception during reconciliation states from device with ip address {}", ipAddress, e);
+ }
+ return jmxReconciliationStates;
+ }
+
+ private Map<String,String> getLocalStatusSummary() {
+ return reconciliationJMXServiceMBean.acquireReconciliationStates();
+ }
+
+ @SuppressFBWarnings("DM_DEFAULT_ENCODING")
+ private String invokeRemoteRestOperation(String ipAddress) throws Exception {
+ String restUrl = buildRemoteReconcilationUrl(ipAddress);
+ LOG.info("invokeRemoteReconcilationState() REST URL: {}", restUrl);
+ String authString = JMX_REST_HTTP_AUTH_UNAME_PWD;
+ byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
+ String authStringEnc = new String(authEncBytes);
+ HttpRequest request = HttpRequest.newBuilder(URI.create(restUrl))
+ .timeout(Duration.ofMillis(HTTP_TIMEOUT))
+ .header("Authorization","Basic " + authStringEnc)
+ .header("Accept","application/json")
+ .GET()
+ .build();
+
+ LOG.debug("sending http request for accessing remote reconcilation");
+ HttpResponse<String> response = HttpClient.newBuilder()
+ .connectTimeout(request.timeout().get().plusMillis(1000))
+ .build()
+ .send(request, HttpResponse.BodyHandlers.ofString());
+ // Response code for success should be 200
+ Integer httpResponseCode = response.statusCode();
+ LOG.debug("http response received for remote reconcilation {}", httpResponseCode);
+ String respStr = response.body();
+ if (httpResponseCode > 299) {
+ LOG.error("Non-200 http response code received {} for URL {}", httpResponseCode, restUrl);
+ if (respStr == null || respStr.isEmpty()) {
+ return "Service Status Retrieval failed. HTTP Response Code : " + httpResponseCode + "\n";
+ }
+ }
+ LOG.trace("HTTP Response is - {} for URL {}", respStr, restUrl);
+
+ return respStr;
+ }
+
+
+ String buildRemoteReconcilationUrl(String host) {
+ String targetHostAsString;
+ InetAddress hostInetAddress = InetAddresses.forString(host);
+ if (hostInetAddress instanceof Inet6Address) {
+ targetHostAsString = '[' + hostInetAddress.getHostAddress() + ']';
+ } else {
+ targetHostAsString = hostInetAddress.getHostAddress();
+ }
+ return new StringBuilder().append(URL_PREFIX).append(targetHostAsString).append(URL_SEPARATOR_COLON)
+ .append(httpPort).append(HTTP_JOL_OKIA_BASE_URI).append(JMX_OBJECT_NAME)
+ .append(URL_SEPARATOR).append(JMX_ATTRIBUTE_NAME).toString();
+ }
+}
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+
+ <reference id="reconciliationJMXServiceMBean"
+ interface="org.opendaylight.openflowplugin.applications.frm.ReconciliationJMXServiceMBean"
+ availability="optional"/>
+ <cm:property-placeholder persistent-id="org.ops4j.pax.web" update-strategy="none">
+ <cm:default-properties>
+ <cm:property name="org.osgi.service.http.port" value="8181"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+ <reference id="clusterMemberInfoProvider"
+ interface="org.opendaylight.infrautils.diagstatus.ClusterMemberInfo"
+ availability="optional"/>
+
+ <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.0.0">
+ <command name="openflow/getallnodes">
+ <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetAllNodesCommandProvider">
+ <property name="dataBroker" ref="dataBroker" />
+ <property name="nodeListener" ref="nodeListener"/>
+ </action>
+ </command>
+ <command name="openflow/shownode">
+ <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ShowNodeCommandProvider">
+ <property name="dataBroker" ref="dataBroker" />
+ </action>
+ </command>
+ <command name="openflow/reconcile">
+ <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.Reconciliation">
+ <property name="reconciliationService" ref="reconciliationService"/>
+ </action>
+ </command>
+ <command name="openflow/getreconciliationcount">
+ <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ReconciliationCount">
+ <property name="dataBroker" ref="dataBroker"/>
+ </action>
+ </command>
+ <command name="openflow/getreconciliationstate">
+ <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetReconciliationStateProvider">
+ <argument ref ="reconciliationJMXServiceMBean"/>
+ <argument ref="clusterMemberInfoProvider"/>
+ <argument value="${org.osgi.service.http.port}"/>
+ </action>
+ </command>
+ </command-bundle>
+
+</blueprint>
\ No newline at end of file
<reference id="dataBroker"
interface="org.opendaylight.mdsal.binding.api.DataBroker"/>
+ <reference id="flowGroupCacheManager" interface="org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager"/>
<odl:rpc-service id="frmReconciliationService"
interface="org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.FrmReconciliationService"/>
<bean id="alarmAgent"
class="org.opendaylight.openflowplugin.applications.southboundcli.alarm.AlarmAgent" init-method="start">
</bean>
+ <bean id="nodeListener"
+ class="org.opendaylight.openflowplugin.applications.southboundcli.NodeListener"
+ init-method="start"
+ destroy-method="close">
+ <argument ref="dataBroker"/>
+ </bean>
<bean id="reconciliationService"
class="org.opendaylight.openflowplugin.applications.southboundcli.ReconciliationServiceImpl"
destroy-method="close">
<argument ref="frmReconciliationService"/>
<argument ref="alarmAgent"/>
<argument ref="nodeListener"/>
- </bean>
- <bean id="nodeListener"
- class="org.opendaylight.openflowplugin.applications.southboundcli.NodeListener"
- init-method="start"
- destroy-method="close">
- <argument ref="dataBroker"/>
+ <argument ref="flowGroupCacheManager"/>
</bean>
<odl:rpc-implementation ref="reconciliationService"/>
- <!--To assert references for CLI implementations-->
- <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.0.0">
- <command name="openflow/getallnodes">
- <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.GetAllNodesCommandProvider">
- <property name="dataBroker" ref="dataBroker" />
- <property name="nodeListener" ref="nodeListener"/>
- </action>
- </command>
- <command name="openflow/shownode">
- <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ShowNodeCommandProvider">
- <property name="dataBroker" ref="dataBroker" />
- </action>
- </command>
- <command name="openflow/reconcile">
- <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.Reconciliation">
- <property name="reconciliationService" ref="reconciliationService"/>
- </action>
- </command>
- <command name="openflow/getreconciliationcount">
- <action class="org.opendaylight.openflowplugin.applications.southboundcli.cli.ReconciliationCount">
- <property name="dataBroker" ref="dataBroker"/>
- </action>
- </command>
- </command-bundle>
-
</blueprint>
}
}
- container reconciliation-state {
- description "Reconciliation state for the given openflow nodes";
- config false;
- list reconciliation-state-list {
- key node-id;
- leaf node-id {
- type uint64;
- }
- uses node-reconcile-state;
- }
- }
-
- grouping node-reconcile-state {
- leaf state {
- description "Current state of the node reconciliation";
- type enumeration {
- enum IN_PROGRESS;
- enum COMPLETED;
- enum FAILED;
- }
- }
- }
-
rpc reconcile {
description "Request the reconciliation for given device or set of devices to the controller.";
input {
--- /dev/null
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow;
+
+import java.util.Map;
+
+public interface FlowGroupCacheManager {
+
+ Map<String, ReconciliationState> getReconciliationStates();
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.api.openflow;
+
+import java.time.LocalDateTime;
+
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconciliationState {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationState.class);
+
+ public enum ReconciliationStatus {
+ STARTED,
+ COMPLETED,
+ FAILED
+ }
+
+ private ReconciliationStatus status;
+ private LocalDateTime time;
+
+ public ReconciliationState(@Nullable ReconciliationStatus status, LocalDateTime time) {
+ this.status = status;
+ this.time = time;
+ }
+
+ public ReconciliationStatus getState() {
+ return status;
+ }
+
+ public void setState(ReconciliationStatus staTus, LocalDateTime timing) {
+ this.status = staTus;
+ this.time = timing;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%-25s %-25s", this.status, this.time);
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.services.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import org.apache.aries.blueprint.annotation.service.Service;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.ReconciliationState;
+
+@Singleton
+@Service(classes = FlowGroupCacheManager.class)
+public class FlowGroupCacheManagerImpl implements FlowGroupCacheManager {
+
+ private Map<String, ReconciliationState> reconciliationStates = new ConcurrentHashMap<>();
+
+ @Override
+ public Map<String, ReconciliationState> getReconciliationStates() {
+ return reconciliationStates;
+ }
+}
\ No newline at end of file