import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
-import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
*/
public class ForwardingRulesManager implements IForwardingRulesManager, PortGroupChangeListener,
IContainerListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader,
- ICacheUpdateAware<Long, String>, CommandProvider, IFlowProgrammerListener {
- private static final String SAVE = "Save";
+ CommandProvider, IFlowProgrammerListener {
private static final String NODEDOWN = "Node is Down";
private static final String SUCCESS = StatusCode.SUCCESS.toString();
private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
- private Map<Long, String> flowsSaveEvent;
+ private static final String PORTREMOVED = "Port removed";
private String frmFileName;
private String portGroupFileName;
private ConcurrentMap<Integer, FlowConfig> staticFlows;
private ConcurrentMap<FlowEntry, FlowEntry> inactiveFlows;
private IContainer container;
- private Set<IForwardingRulesManagerAware> frmAware;
+ private Set<IForwardingRulesManagerAware> frmAware =
+ Collections.synchronizedSet(new HashSet<IForwardingRulesManagerAware>());
private PortGroupProvider portGroupProvider;
private IFlowProgrammerService programmer;
private IClusterContainerServices clusterContainerService = null;
log.warn(logMsg, groupName);
return new Status(StatusCode.NOTACCEPTABLE, msg);
}
- int toBeRemoved = groupFlows.get(groupName).size();
+ int toBeRemoved = 0;
String error = "";
if (groupFlows.containsKey(groupName)) {
List<FlowEntryInstall> list = new ArrayList<FlowEntryInstall>(groupFlows.get(groupName));
+ toBeRemoved = list.size();
for (FlowEntryInstall entry : list) {
Status status = this.removeEntry(entry.getOriginal(), false);
if (status.isSuccess()) {
portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
portGroupData = new ConcurrentHashMap<PortGroupConfig, Map<Node, PortGroup>>();
staticFlows = new ConcurrentHashMap<Integer, FlowConfig>();
- flowsSaveEvent = new HashMap<Long, String>();
inactiveFlows = new ConcurrentHashMap<FlowEntry, FlowEntry>();
}
return list;
}
+ @Override
+ public List<FlowEntry> getInstalledFlowEntriesForGroup(String policyName) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (policyName != null && !policyName.trim().isEmpty()) {
+ for (Map.Entry<FlowEntryInstall, FlowEntryInstall> entry : this.installedSwView.entrySet()) {
+ if (policyName.equals(entry.getKey().getGroupName())) {
+ list.add(entry.getKey().getInstall().clone());
+ }
+ }
+ }
+ return list;
+ }
+
@Override
public void addOutputPort(Node node, String flowName, List<NodeConnector> portList) {
try {
clusterContainerService.createCache("frm.originalSwView",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.installedSwView",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.inactiveFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.nodeFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.groupFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.staticFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.staticFlowsOrdinal",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.portGroupConfigs",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.portGroupData",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.TSPolicies",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
log.error("Retrieval of frm.staticFlows cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.flowsSaveEvent");
- if (map != null) {
- flowsSaveEvent = (ConcurrentMap<Long, String>) map;
- } else {
- log.error("Retrieval of frm.flowsSaveEvent cache failed for Container {}", container.getName());
- }
-
map = clusterContainerService.getCache("frm.staticFlowsOrdinal");
if (map != null) {
staticFlowsOrdinal = (ConcurrentMap<Integer, Integer>) map;
@Override
public Status saveConfig() {
- // Publish the save config event to the cluster nodes
- flowsSaveEvent.put(new Date().getTime(), SAVE);
return saveConfigInternal();
}
return new Status(StatusCode.SUCCESS, null);
}
- @Override
- public void entryCreated(Long key, String cacheName, boolean local) {
- }
-
- @Override
- public void entryUpdated(Long key, String new_value, String cacheName, boolean originLocal) {
- saveConfigInternal();
- }
-
- @Override
- public void entryDeleted(Long key, String cacheName, boolean originLocal) {
- }
-
@Override
public void subnetNotify(Subnet sub, boolean add) {
}
log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
for (FlowConfig fc : defaultConfigs) {
- Status status = (proactive) ? addStaticFlowInternal(fc, true) : removeStaticFlow(fc);
+ Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
if (status.isSuccess()) {
log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
} else {
}
}
+ private boolean doesFlowContainNodeConnector(Flow flow, NodeConnector nc) {
+ if (nc == null) {
+ return false;
+ }
+
+ Match match = flow.getMatch();
+ if (match.isPresent(MatchType.IN_PORT)) {
+ NodeConnector matchPort = (NodeConnector) match.getField(MatchType.IN_PORT).getValue();
+ if (matchPort.equals(nc)) {
+ return true;
+ }
+ }
+ List<Action> actionsList = flow.getActions();
+ if (actionsList != null) {
+ for (Action action : actionsList) {
+ if (action instanceof Output) {
+ NodeConnector actionPort = ((Output) action).getPort();
+ if (actionPort.equals(nc)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
this.pendingEvents.offer(new NodeUpdateEvent(type, node));
*
*/
void init() {
- frmAware = Collections.synchronizedSet(new HashSet<IForwardingRulesManagerAware>());
frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf";
portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf";
*
*/
void destroy() {
+ frmAware.clear();
}
/**
}
@Override
- public void nodeConnectorUpdated(String containerName, NodeConnector p, UpdateType t) {
+ public void nodeConnectorUpdated(String containerName, NodeConnector nc, UpdateType t) {
if (!container.getName().equals(containerName)) {
return;
}
+
+ boolean updateStaticFlowCluster = false;
+
+ switch (t) {
+ case REMOVED:
+
+ List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nc.getNode());
+ if (nodeFlowEntries == null) {
+ return;
+ }
+ for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
+ if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) {
+ Status status = this.removeEntryInternal(fei, true);
+ if (!status.isSuccess()) {
+ continue;
+ }
+ /*
+ * If the flow entry is a static flow, then update its
+ * configuration
+ */
+ if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
+ FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
+ if (flowConfig != null) {
+ flowConfig.setStatus(PORTREMOVED);
+ updateStaticFlowCluster = true;
+ }
+ }
+ }
+ }
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
+ break;
+ case ADDED:
+ List<FlowConfig> flowConfigForNode = getStaticFlows(nc.getNode());
+ for (FlowConfig flowConfig : flowConfigForNode) {
+ if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) {
+ if (flowConfig.installInHw()) {
+ Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+ if (!status.isSuccess()) {
+ flowConfig.setStatus(status.getDescription());
+ } else {
+ flowConfig.setStatus(SUCCESS);
+ }
+ updateStaticFlowCluster = true;
+ }
+ }
+ }
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
+ break;
+ case CHANGED:
+ break;
+ default:
+ }
}
@Override