import com.google.common.base.Optional;
-import jline.internal.Log;
-
import java.math.BigInteger;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.genius.mdsalutil.MDSALUtil;
import org.opendaylight.ovsdb.utils.southbound.utils.SouthboundUtils;
+import org.opendaylight.unimgr.api.UnimgrDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.http.metroethernetforum.org.ns.yang.mef.global.rev150526.MefGlobal;
import org.opendaylight.yang.gen.v1.http.metroethernetforum.org.ns.yang.mef.global.rev150526.mef.global.Profiles;
import org.opendaylight.yang.gen.v1.http.metroethernetforum.org.ns.yang.mef.global.rev150526.mef.global.bwp.flows.group.BwpFlow;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class UniQosManager {
- private static final Logger LOG = LoggerFactory.getLogger(UniQosManager.class);
+public class UniQosManager extends UnimgrDataTreeChangeListener<BwpFlow> {
+ private static final Logger Log = LoggerFactory.getLogger(UniQosManager.class);
private OdlInterfaceRpcService odlInterfaceRpcService;
private DataBroker dataBroker;
private final Long noLimit = 0l;
+ private final static String noProfile = "";
+ private ListenerRegistration<UniQosManager> bwListenerRegistration;
+
// key in first map is uniId, key in second map is logical portId
private ConcurrentHashMap<String, ConcurrentHashMap<String, BandwidthLimits>> uniPortBandwidthLimits;
// map of current values per uni
private ConcurrentHashMap<String, BandwidthLimits> uniBandwidthLimits;
-
+
private ConcurrentHashMap<String, BigInteger> uniToDpn;
public UniQosManager(final DataBroker dataBroker, OdlInterfaceRpcService odlInterfaceRpcService) {
+ super(dataBroker);
+
this.dataBroker = dataBroker;
this.odlInterfaceRpcService = odlInterfaceRpcService;
this.uniPortBandwidthLimits = new ConcurrentHashMap<>();
this.uniBandwidthLimits = new ConcurrentHashMap<>();
this.uniToDpn = new ConcurrentHashMap<>();
+ registerListener();
}
- public void mapUniPortBandwidthLimits(String uniId, String portId, Long maxKbps, Long maxBurstKb) {
- Log.info("Record rate limits for Uni {} port {} maxKbps {} maxBurstKb {}", uniId, portId, maxKbps, maxBurstKb);
- uniPortBandwidthLimits.putIfAbsent(uniId, new ConcurrentHashMap<>());
- ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId);
- uniMap.put(portId, new BandwidthLimits(maxKbps, maxBurstKb));
+ public void registerListener() {
+ try {
+ final DataTreeIdentifier<BwpFlow> dataTreeIid = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+ getBwFlowsInstanceIdentifier());
+ bwListenerRegistration = dataBroker.registerDataTreeChangeListener(dataTreeIid, this);
+ Log.info("UniQosManager created and registered");
+ } catch (final Exception e) {
+ Log.error("UniQosManager DataChange listener registration failed !", e);
+ throw new IllegalStateException("UniQosManager registration Listener failed.", e);
+ }
}
- public void mapUniPortBandwidthLimits(String uniId, String portId, Identifier45 bwProfile) {
+ public synchronized void mapUniPortBandwidthLimits(String uniId, String portId, Identifier45 bwProfile) {
Long maxKbps = noLimit;
Long maxBurstKb = noLimit;
if (bwProfile != null) {
-
Optional<BwpFlow> bwFlowOp = MdsalUtils.read(dataBroker, LogicalDatastoreType.CONFIGURATION,
getBwFlowInstanceIdentifier(bwProfile));
if (!bwFlowOp.isPresent()) {
Log.trace("Can't read bw profile {} for Uni {}", bwProfile, uniId);
- return;
+ } else {
+ // Kb per second
+ maxKbps = bwFlowOp.get().getCir().getValue();
+ // burst in bytes, ovs requires in Kb
+ maxBurstKb = bwFlowOp.get().getCbs().getValue() * 8 / 1024;
+ Log.info("Record rate limits for Uni {} Profile {}", uniId, bwProfile);
}
- // Kb per second
- maxKbps = bwFlowOp.get().getCir().getValue();
- // burst in bytes, ovs requires in Kb
- maxBurstKb = bwFlowOp.get().getCbs().getValue() * 8 / 1024;
- Log.info("Record rate limits for Uni {} Profile {}", uniId, bwProfile);
}
- mapUniPortBandwidthLimits(uniId, portId, maxKbps, maxBurstKb);
+
+ mapUniPortBandwidthLimits(uniId, portId, maxKbps, maxBurstKb, replaceNull(bwProfile));
}
- public void unMapUniPortBandwidthLimits(String uniId, String portId) {
- Log.debug("Delete rate limits for Uni {} port {}", uniId, portId);
+ private synchronized void mapUniPortBandwidthLimits(String uniId, String portId, Long maxKbps, Long maxBurstKb,
+ String profileName) {
+ Log.info("Record rate limits for Uni {} port {} maxKbps {} maxBurstKb {}", uniId, portId, maxKbps, maxBurstKb);
+ uniPortBandwidthLimits.putIfAbsent(uniId, new ConcurrentHashMap<>());
+ ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId);
+ uniMap.put(portId, new BandwidthLimits(maxKbps, maxBurstKb, profileName));
+ }
+
+ public void updateUni(Identifier45 uniId, Identifier45 bwProfile) {
+ String bwProfileSafe = replaceNull(bwProfile);
+ Log.info("Update rate limits for Uni {}", uniId.getValue());
+ ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId.getValue());
+ if (uniMap == null) {
+ Log.error("Trying to update limits for non-exsting uni {}", uniId.getValue());
+ return;
+ }
+ for (String portName : uniMap.keySet()) {
+ if (uniMap.get(portName).getProfileName().equals(bwProfileSafe)) {
+ continue;
+ }
+ if (bwProfile != null) {
+ mapUniPortBandwidthLimits(uniId.getValue(), portName, new Identifier45(bwProfileSafe));
+ } else {
+ unMapUniPortBandwidthLimits(uniId.getValue(), portName);
+ }
+ }
+ }
+
+ private void updateProfile(Identifier45 bwProfile) {
+ Log.info("Update rate limits for profile {}", bwProfile);
+ List<String> unisWithProfile = uniBandwidthLimits.entrySet().stream()
+ .filter(m -> m.getValue().profileName.equals(bwProfile.getValue())).map(m -> m.getKey())
+ .collect(Collectors.toList());
+
+ for (String uniId : unisWithProfile) {
+ ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId);
+ uniMap.forEach((k, v) -> {
+ mapUniPortBandwidthLimits(uniId, k, bwProfile);
+ });
+ }
+
+ for (String uniId : unisWithProfile) {
+ setUniBandwidthLimits(uniId);
+ }
+ }
+
+ public void deleteProfile(Identifier45 bwProfile) {
+ Log.info("Delete rate limits for profile {}", bwProfile);
+ List<String> unisWithProfile = uniBandwidthLimits.entrySet().stream()
+ .filter(m -> m.getValue().profileName.equals(bwProfile.getValue())).map(m -> m.getKey())
+ .collect(Collectors.toList());
+
+ for (String uniId : unisWithProfile) {
+ ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId);
+ uniMap.forEach((k, v) -> {
+ unMapUniPortBandwidthLimits(uniId, k, bwProfile.getValue());
+ });
+ }
+
+ for (String uniId : unisWithProfile) {
+ setUniBandwidthLimits(uniId);
+ }
+ }
+
+ public synchronized void unMapUniPortBandwidthLimits(String uniId, String portId) {
+ unMapUniPortBandwidthLimits(uniId, portId, noProfile);
+ }
+
+ public synchronized void unMapUniPortBandwidthLimits(String uniId, String portId, String profileTosave) {
+ Log.info("Delete rate limits for Uni {} port {}", uniId, portId);
ConcurrentHashMap<String, BandwidthLimits> uniMap = uniPortBandwidthLimits.get(uniId);
if (uniMap == null) {
Log.error("Trying to delete limits for non-exsting uni {}", uniId);
}
uniMap.remove(portId);
if (uniMap.isEmpty()) {
- uniMap.put(portId, new BandwidthLimits(noLimit, noLimit));
+ uniMap.put(portId, new BandwidthLimits(noLimit, noLimit, profileTosave));
}
}
public void setUniBandwidthLimits(Identifier45 uniIden) {
String uniId = uniIden.getValue();
+ setUniBandwidthLimits(uniId);
+ }
+
+ private synchronized void setUniBandwidthLimits(String uniId) {
if (!uniPortBandwidthLimits.containsKey(uniId)) {
Log.debug("Uni {} doesn't have rate limits configured", uniId);
return;
ConcurrentHashMap<String, BandwidthLimits> uniLimits) {
Long sumOfRate = noLimit;
Long sumOfBurst = noLimit;
+ String profileName = noProfile;
Boolean hasNullRate = false;
Boolean hasNullBurst = false;
if (uniLimits == null || uniLimits.keySet() == null) {
- return new BandwidthLimits(sumOfRate, sumOfBurst);
+ return new BandwidthLimits(sumOfRate, sumOfBurst, profileName);
}
for (BandwidthLimits v : uniLimits.values()) {
}
sumOfRate = sumOfRate + v.maxKbps;
sumOfBurst = sumOfBurst + v.maxBurstKb;
-
+ profileName = v.profileName;
}
if (hasNullRate) {
sumOfRate = noLimit;
} else if (hasNullBurst) {
sumOfBurst = noLimit;
}
- return new BandwidthLimits(sumOfRate, sumOfBurst);
+ return new BandwidthLimits(sumOfRate, sumOfBurst, profileName);
}
private void setPortBandwidthLimits(String uniId, String logicalPortId, Long maxKbps, Long maxBurstKb) {
- LOG.trace("Setting bandwidth limits {} {} on Port {}", maxKbps, maxBurstKb, logicalPortId);
+ Log.info("Setting bandwidth limits {} {} on Port {}", maxKbps, maxBurstKb, logicalPortId);
BigInteger dpId = BigInteger.ZERO;
- if(uniToDpn.containsKey(uniId)) {
+ if (uniToDpn.containsKey(uniId)) {
dpId = uniToDpn.get(uniId);
- } else {
+ } else {
dpId = getDpnForInterface(odlInterfaceRpcService, logicalPortId);
uniToDpn.put(uniId, dpId);
}
if (dpId.equals(BigInteger.ZERO)) {
- LOG.error("DPN ID for interface {} not found", logicalPortId);
+ Log.error("DPN ID for interface {} not found", logicalPortId);
return;
}
Optional<Node> bridgeNode = MDSALUtil.read(LogicalDatastoreType.OPERATIONAL,
bridgeRefEntry.getValue().firstIdentifierOf(Node.class), dataBroker);
if (bridgeNode == null) {
- LOG.error("Bridge ref for interface {} not found", logicalPortId);
+ Log.error("Bridge ref for interface {} not found", logicalPortId);
return;
}
String physicalPort = getPhysicalPortForUni(dataBroker, uniId);
if (physicalPort == null) {
- LOG.error("Physical port for interface {} not found", logicalPortId);
+ Log.error("Physical port for interface {} not found", logicalPortId);
return;
}
TerminationPoint tp = getTerminationPoint(bridgeNode.get(), physicalPort);
if (tp == null) {
- LOG.error("Termination point for port {} not found", physicalPort);
+ Log.error("Termination point for port {} not found", physicalPort);
return;
}
if (dpIdResult.isSuccessful()) {
nodeId = dpIdResult.getResult().getDpid();
} else {
- LOG.error("Could not retrieve DPN Id for interface {}", ifName);
+ Log.error("Could not retrieve DPN Id for interface {}", ifName);
}
} catch (NullPointerException | InterruptedException | ExecutionException e) {
- LOG.error("Exception when getting dpn for interface {}", ifName, e);
+ Log.error("Exception when getting dpn for interface {}", ifName, e);
}
return nodeId;
}
String parentInterfaceName = MefInterfaceUtils.getTrunkParentName(link);
return parentInterfaceName.split(":")[1];
} catch (Exception e) {
- LOG.error("Exception when getting physical port for Uni {}", uniId, e);
+ Log.error("Exception when getting physical port for Uni {}", uniId, e);
}
return nodeId;
}
return bwProfileInstanceIdentifierBuilder.build();
}
+ private static InstanceIdentifier<BwpFlow> getBwFlowsInstanceIdentifier() {
+ InstanceIdentifier.InstanceIdentifierBuilder<BwpFlow> bwProfileInstanceIdentifierBuilder = InstanceIdentifier
+ .builder(MefGlobal.class).child(Profiles.class).child(IngressBwpFlows.class).child(BwpFlow.class);
+ return bwProfileInstanceIdentifierBuilder.build();
+ }
+
private class BandwidthLimits {
private final Long maxKbps;
private final Long maxBurstKb;
+ private final String profileName;
- public BandwidthLimits(Long maxKbps, Long maxBurstKb) {
+ public BandwidthLimits(Long maxKbps, Long maxBurstKb, String profileName) {
this.maxKbps = replaceNull(maxKbps);
this.maxBurstKb = replaceNull(maxBurstKb);
+ this.profileName = profileName;
}
public Long getMaxKbps() {
return maxBurstKb;
}
+ public String getProfileName() {
+ return profileName;
+ }
+
private Long replaceNull(Long value) {
return (value == null) ? Long.valueOf(0) : value;
}
return UniQosManager.this;
}
}
+
+ private static String replaceNull(Identifier45 value) {
+ return (value == null) ? noProfile : value.getValue();
+ }
+
+ @Override
+ public void close() throws Exception {
+ bwListenerRegistration.close();
+ }
+
+ @Override
+ public void add(DataTreeModification<BwpFlow> newDataObject) {
+ if (newDataObject.getRootPath() != null && newDataObject.getRootNode() != null) {
+ Log.info("bw profile {} created", newDataObject.getRootNode().getIdentifier());
+ updateProfile(newDataObject.getRootNode().getDataAfter().getBwProfile());
+ }
+ }
+
+ @Override
+ public void remove(DataTreeModification<BwpFlow> removedDataObject) {
+ if (removedDataObject.getRootPath() != null && removedDataObject.getRootNode() != null) {
+ Log.info("bw profile {} deleted", removedDataObject.getRootNode().getIdentifier());
+ deleteProfile(removedDataObject.getRootNode().getDataBefore().getBwProfile());
+ }
+ }
+
+ @Override
+ public void update(DataTreeModification<BwpFlow> modifiedDataObject) {
+ if (modifiedDataObject.getRootPath() != null && modifiedDataObject.getRootNode() != null) {
+ Log.info("bw profile {} modified", modifiedDataObject.getRootNode().getIdentifier());
+ updateProfile(modifiedDataObject.getRootNode().getDataAfter().getBwProfile());
+ }
+ }
}