X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrm%2Fimpl%2FFlowNodeReconciliationImpl.java;h=909f4c1423b3113cea57c6c820cccc44aff45c71;hb=eaf84d7364d73342346682d4604fe823410e977c;hp=e372ff557dfbc88b11c8da1e802cead2b75b5ec5;hpb=3822d4e24cfcedf33e36eedeb8fb683296f6aed4;p=openflowplugin.git diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index e372ff557d..909f4c1423 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -8,7 +8,10 @@ package org.opendaylight.openflowplugin.applications.frm.impl; +import java.math.BigInteger; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -22,8 +25,6 @@ import com.google.common.util.concurrent.Futures; import java.util.concurrent.Callable; import org.opendaylight.controller.md.sal.binding.api.*; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation; @@ -33,7 +34,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.acti import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder; @@ -55,15 +55,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; @@ -94,6 +91,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private ListenerRegistration listenerRegistration; + private final int THREAD_POOL_SIZE = 4; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); + private static final InstanceIdentifier II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier.builder(Nodes.class) .child(Node.class) @@ -220,40 +220,49 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { connectedNode.toString()); reconciliationPreProcess(connectedNode); } - reconciliation(connectedNode); + ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode); + executor.execute(reconciliationTask); } } - private void reconciliation(final InstanceIdentifier nodeIdent) { + private class ReconciliationTask implements Runnable { - String sNode = nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); - long nDpId = getDpnIdFromNodeName(sNode); + InstanceIdentifier nodeIdentity; - ReadOnlyTransaction trans = provider.getReadTranaction(); - Optional flowNode = Optional.absent(); - - AtomicInteger counter = new AtomicInteger(); - //initialize the counter - counter.set(0); - try { - flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get(); - } - catch (Exception e) { - LOG.error("Fail with read Config/DS for Node {} !", nodeIdent, e); + public ReconciliationTask(final InstanceIdentifier nodeIdent) { + nodeIdentity = nodeIdent; } - if (flowNode.isPresent()) { - /* Tables - have to be pushed before groups */ - // CHECK if while pusing the update, updateTableInput can be null to emulate a table add - List tableList = flowNode.get().getTableFeatures() != null - ? flowNode.get().getTableFeatures() : Collections. emptyList() ; - for (TableFeatures tableFeaturesItem : tableList) { - TableFeaturesKey tableKey = tableFeaturesItem.getKey(); - KeyedInstanceIdentifier tableFeaturesII - = nodeIdent.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); - provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdent); + @Override + public void run() { + + String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); + BigInteger nDpId = getDpnIdFromNodeName(sNode); + + ReadOnlyTransaction trans = provider.getReadTranaction(); + Optional flowNode = Optional.absent(); + + AtomicInteger counter = new AtomicInteger(); + //initialize the counter + counter.set(0); + try { + flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); + } catch (Exception e) { + LOG.error("Fail with read Config/DS for Node {} !", nodeIdentity, e); } + if (flowNode.isPresent()) { + /* Tables - have to be pushed before groups */ + // CHECK if while pusing the update, updateTableInput can be null to emulate a table add + List tableList = flowNode.get().getTableFeatures() != null + ? flowNode.get().getTableFeatures() : Collections.emptyList(); + for (TableFeatures tableFeaturesItem : tableList) { + TableFeaturesKey tableKey = tableFeaturesItem.getKey(); + KeyedInstanceIdentifier tableFeaturesII + = nodeIdentity.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); + provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdentity); + } + /* Groups - have to be first */ List groups = flowNode.get().getGroup() != null ? flowNode.get().getGroup() : Collections.emptyList(); @@ -264,10 +273,10 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { List suspectedGroups = new ArrayList<>(); while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) && - (counter.get()<=provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold + (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold - if(toBeInstalledGroups.isEmpty() && ! suspectedGroups.isEmpty()){ - LOG.error("These Groups are pointing to node-connectors that are not up yet {}",suspectedGroups.toString()); + if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) { + LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString()); toBeInstalledGroups.addAll(suspectedGroups); break; } @@ -278,27 +287,27 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { boolean okToInstall = true; for (Bucket bucket : group.getBuckets().getBucket()) { for (Action action : bucket.getAction()) { - //chained-port + //chained-port if (action.getAction().getImplementedInterface().getName() - .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")){ - String nodeConnectorUri = ((OutputActionCase)(action.getAction())) + .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) { + String nodeConnectorUri = ((OutputActionCase) (action.getAction())) .getOutputAction().getOutputNodeConnector().getValue(); - LOG.warn("Installing the group for node connector {}",nodeConnectorUri); + LOG.warn("Installing the group for node connector {}", nodeConnectorUri); //check if the nodeconnector is there in the multimap boolean isPresent = provider.getFlowNodeConnectorInventoryTranslatorImpl() .isNodeConnectorUpdated(nDpId, nodeConnectorUri); //if yes set okToInstall = true - if(isPresent){ - break; + if (isPresent) { + break; }//else put it in a different list and still set okToInstall = true else { suspectedGroups.add(group); LOG.error("Not yet received the node-connector updated for {} " + - "for the group with id {}",nodeConnectorUri,group.getGroupId().toString()); - break; + "for the group with id {}", nodeConnectorUri, group.getGroupId().toString()); + break; } @@ -313,21 +322,20 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } } } - if (!okToInstall){ + if (!okToInstall) { //increment retry counter value counter.incrementAndGet(); break; } - } if (okToInstall) { final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); alreadyInstalledGroupids.add(group.getGroupId().getValue()); iterator.remove(); // resetting the counter to zero @@ -337,48 +345,48 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } /* installation of suspected groups*/ - if(!toBeInstalledGroups.isEmpty()){ - for(Group group :toBeInstalledGroups){ - LOG.error("Installing the group {} finally although the port is not up after checking for {} times " - ,group.getGroupId().toString(),provider.getConfiguration().getReconciliationRetryCount()); - final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + if (!toBeInstalledGroups.isEmpty()) { + for (Group group : toBeInstalledGroups) { + LOG.error("Installing the group {} finally although the port is not up after checking for {} times " + , group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount()); + final KeyedInstanceIdentifier groupIdent = + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); + } } - } /* Meters */ - List meters = flowNode.get().getMeter() != null - ? flowNode.get().getMeter() : Collections. emptyList(); - for (Meter meter : meters) { - final KeyedInstanceIdentifier meterIdent = - nodeIdent.child(Meter.class, meter.getKey()); - this.provider.getMeterCommiter().add(meterIdent, meter, nodeIdent); - } + List meters = flowNode.get().getMeter() != null + ? flowNode.get().getMeter() : Collections.emptyList(); + for (Meter meter : meters) { + final KeyedInstanceIdentifier meterIdent = + nodeIdentity.child(Meter.class, meter.getKey()); + provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + } /* Flows */ - List tables = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList(); - for (Table table : tables) { - final KeyedInstanceIdentifier tableIdent = - nodeIdent.child(Table.class, table.getKey()); - List flows = table.getFlow() != null ? table.getFlow() : Collections. emptyList(); - for (Flow flow : flows) { - final KeyedInstanceIdentifier flowIdent = - tableIdent.child(Flow.class, flow.getKey()); - this.provider.getFlowCommiter().add(flowIdent, flow, nodeIdent); + List
tables = flowNode.get().getTable() != null + ? flowNode.get().getTable() : Collections.
emptyList(); + for (Table table : tables) { + final KeyedInstanceIdentifier tableIdent = + nodeIdentity.child(Table.class, table.getKey()); + List flows = table.getFlow() != null ? table.getFlow() : Collections.emptyList(); + for (Flow flow : flows) { + final KeyedInstanceIdentifier flowIdent = + tableIdent.child(Flow.class, flow.getKey()); + provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity); + } } } - } /* clean transaction */ - trans.close(); + trans.close(); + } } - private long getDpnIdFromNodeName(String nodeName) { + private BigInteger getDpnIdFromNodeName(String nodeName) { String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); - return Long.parseLong(dpId); - } + return new BigInteger(dpId); + } private void reconciliationPreProcess(final InstanceIdentifier nodeIdent) { - List> staleFlowsToBeBulkDeleted = Lists.newArrayList(); List> staleGroupsToBeBulkDeleted = Lists.newArrayList(); List> staleMetersToBeBulkDeleted = Lists.newArrayList();