package org.opendaylight.controller.forwardingrulesmanager.consumer.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.md.sal.common.api.data.DataModification; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.Meters; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.MeterKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.BandType; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Drop; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.DscpRemark; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Experimenter; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MeterConsumerImpl implements IForwardingRulesManager { protected static final Logger logger = LoggerFactory.getLogger(MeterConsumerImpl.class); private final MeterEventListener meterEventListener = new MeterEventListener(); private Registration meterListener; private SalMeterService meterService; private MeterDataCommitHandler commitHandler; private ConcurrentMap originalSwMeterView; @SuppressWarnings("unused") private ConcurrentMap installedSwMeterView; @SuppressWarnings("unused") private ConcurrentMap> nodeMeters; @SuppressWarnings("unused") private ConcurrentMap inactiveMeters; @SuppressWarnings("unused") private IContainer container; private IClusterContainerServices clusterMeterContainerService = null; public MeterConsumerImpl() { InstanceIdentifier path = InstanceIdentifier.builder(Meters.class).child(Meter.class) .toInstance(); meterService = FRMConsumerImpl.getProviderSession().getRpcService(SalMeterService.class); clusterMeterContainerService = FRMConsumerImpl.getClusterContainerService(); container = FRMConsumerImpl.getContainer(); if (!(cacheStartup())) { logger.error("Unable to allocate/retrieve meter cache"); System.out.println("Unable to allocate/retrieve meter cache"); } if (null == meterService) { logger.error("Consumer SAL Meter Service is down or NULL. FRM may not function as intended"); System.out.println("Consumer SAL Meter Service is down or NULL."); return; } // For switch/plugin events meterListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(meterEventListener); if (null == meterListener) { logger.error("Listener to listen on meter data modifcation events"); System.out.println("Listener to listen on meter data modifcation events."); return; } commitHandler = new MeterDataCommitHandler(); FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler); } private boolean allocateMeterCaches() { if (this.clusterMeterContainerService == null) { logger.warn("Meter: Un-initialized clusterMeterContainerService, can't create cache"); return false; } try { clusterMeterContainerService.createCache("frm.originalSwMeterView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterMeterContainerService.createCache("frm.installedSwMeterView", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterMeterContainerService.createCache("frm.inactiveMeters", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterMeterContainerService.createCache("frm.nodeMeters", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); // TODO for cluster mode /* * clusterMeterContainerService.createCache(WORK_STATUS_CACHE, * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, * IClusterServices.cacheMode.ASYNC)); * * clusterMeterContainerService.createCache(WORK_ORDER_CACHE, * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, * IClusterServices.cacheMode.ASYNC)); */ } catch (CacheConfigException cce) { logger.error("Meter CacheConfigException"); return false; } catch (CacheExistException cce) { logger.error(" Meter CacheExistException"); } return true; } private void nonClusterMeterObjectCreate() { originalSwMeterView = new ConcurrentHashMap(); installedSwMeterView = new ConcurrentHashMap(); nodeMeters = new ConcurrentHashMap>(); inactiveMeters = new ConcurrentHashMap(); } @SuppressWarnings({ "unchecked" }) private boolean retrieveMeterCaches() { ConcurrentMap map; if (this.clusterMeterContainerService == null) { logger.warn("Meter: un-initialized clusterMeterContainerService, can't retrieve cache"); nonClusterMeterObjectCreate(); return false; } map = clusterMeterContainerService.getCache("frm.originalSwMeterView"); if (map != null) { originalSwMeterView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(originalSwMeterView) failed"); return false; } map = clusterMeterContainerService.getCache("frm.installedSwMeterView"); if (map != null) { installedSwMeterView = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(installedSwMeterView) failed"); return false; } map = clusterMeterContainerService.getCache("frm.inactiveMeters"); if (map != null) { inactiveMeters = (ConcurrentMap) map; } else { logger.error("Retrieval of cache(inactiveMeters) failed"); return false; } map = clusterMeterContainerService.getCache("frm.nodeMeters"); if (map != null) { nodeMeters = (ConcurrentMap>) map; } else { logger.error("Retrieval of cache(nodeMeter) failed"); return false; } return true; } private boolean cacheStartup() { if (allocateMeterCaches()) { if (retrieveMeterCaches()) { return true; } } return false; } /** * Adds Meter to the southbound plugin and our internal database * * @param path * @param dataObject */ private Status addMeter(InstanceIdentifier path, Meter meterAddDataObject) { MeterKey meterKey = meterAddDataObject.getKey(); if (null != meterKey && validateMeter(meterAddDataObject, FRMUtil.operation.ADD).isSuccess()) { if (meterAddDataObject.isInstall()) { AddMeterInputBuilder meterBuilder = new AddMeterInputBuilder(); meterBuilder.setContainerName(meterAddDataObject.getContainerName()); meterBuilder.setFlags(meterAddDataObject.getFlags()); meterBuilder.setMeterBandHeaders(meterAddDataObject.getMeterBandHeaders()); meterBuilder.setMeterId(meterAddDataObject.getMeterId()); meterBuilder.setNode(meterAddDataObject.getNode()); // originalSwMeterView.put(meterKey, meterAddDataObject); meterService.addMeter(meterBuilder.build()); } // originalSwMeterView.put(meterKey, meterAddDataObject); } else { return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed"); } return new Status(StatusCode.SUCCESS); } /* * Update Meter to the southbound plugin and our internal database * * @param path * * @param dataObject */ private Status updateMeter(InstanceIdentifier path, Meter meterUpdateDataObject) { MeterKey meterKey = meterUpdateDataObject.getKey(); UpdatedMeterBuilder updateMeterBuilder = null; if (null != meterKey && validateMeter(meterUpdateDataObject, FRMUtil.operation.UPDATE).isSuccess()) { /* if (originalSwMeterView.containsKey(meterKey)) { originalSwMeterView.remove(meterKey); originalSwMeterView.put(meterKey, meterUpdateDataObject); }*/ if (meterUpdateDataObject.isInstall()) { UpdateMeterInputBuilder updateMeterInputBuilder = new UpdateMeterInputBuilder(); updateMeterBuilder = new UpdatedMeterBuilder(); updateMeterBuilder.fieldsFrom(meterUpdateDataObject); updateMeterInputBuilder.setUpdatedMeter(updateMeterBuilder.build()); /* if (installedSwMeterView.containsKey(meterKey)) { installedSwMeterView.remove(meterKey); installedSwMeterView.put(meterKey, meterUpdateDataObject); }*/ meterService.updateMeter(updateMeterInputBuilder.build()); } } else { return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed"); } return new Status(StatusCode.SUCCESS); } /* * Remove Meter to the southbound plugin and our internal database * * @param path * * @param dataObject */ private Status removeMeter(InstanceIdentifier path, Meter meterRemoveDataObject) { MeterKey meterKey = meterRemoveDataObject.getKey(); if (null != meterKey && validateMeter(meterRemoveDataObject, FRMUtil.operation.DELETE).isSuccess()) { if (meterRemoveDataObject.isInstall()) { RemoveMeterInputBuilder meterBuilder = new RemoveMeterInputBuilder(); meterBuilder.setContainerName(meterRemoveDataObject.getContainerName()); meterBuilder.setNode(meterRemoveDataObject.getNode()); meterBuilder.setFlags(meterRemoveDataObject.getFlags()); meterBuilder.setMeterBandHeaders(meterRemoveDataObject.getMeterBandHeaders()); meterBuilder.setMeterId(meterRemoveDataObject.getMeterId()); meterBuilder.setNode(meterRemoveDataObject.getNode()); // originalSwMeterView.put(meterKey, meterAddDataObject); meterService.removeMeter(meterBuilder.build()); } // originalSwMeterView.put(meterKey, meterAddDataObject); } else { return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed"); } return new Status(StatusCode.SUCCESS); } public Status validateMeter(Meter meter, FRMUtil.operation operation) { String containerName; String meterName; Status returnStatus = null; if (null != meter) { containerName = meter.getContainerName(); if (null == containerName) { containerName = GlobalConstants.DEFAULT.toString(); } else if (!FRMUtil.isNameValid(containerName)) { logger.error("Container Name is invalid %s" + containerName); returnStatus = new Status(StatusCode.BADREQUEST, "Container Name is invalid"); return returnStatus; } meterName = meter.getMeterName(); if (!FRMUtil.isNameValid(meterName)) { logger.error("Meter Name is invalid %s" + meterName); returnStatus = new Status(StatusCode.BADREQUEST, "Meter Name is invalid"); return returnStatus; } /* returnResult = doesMeterEntryExists(meter.getKey(), meterName, containerName); if (FRMUtil.operation.ADD == operation && returnResult) { logger.error("Record with same Meter Name exists"); returnStatus = new Status(StatusCode.BADREQUEST, "Meter record exists"); return returnStatus; } else if (!returnResult) { logger.error("Group record does not exist"); returnStatus = new Status(StatusCode.BADREQUEST, "Meter record does not exist"); return returnStatus; }*/ for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) { if (!meter.getFlags().isMeterBurst()) { if (0 < meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBurstSize()) { logger.error("Burst size should only be associated when Burst FLAG is set"); returnStatus = new Status(StatusCode.BADREQUEST, "Burst size should only be associated when Burst FLAG is set"); break; } } } if (null != returnStatus && !returnStatus.isSuccess()) { return returnStatus; } else { BandType setBandType = null; DscpRemark dscpRemark = null; for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) { setBandType = meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBandType(); if (setBandType instanceof DscpRemark) { dscpRemark = (DscpRemark) setBandType; if (0 > dscpRemark.getRate()) { } } else if (setBandType instanceof Drop) { if (0 < dscpRemark.getPercLevel()) { logger.error("Number of drop Precedence level"); } } else if (setBandType instanceof Experimenter) { } } } } return new Status(StatusCode.SUCCESS); } /*private boolean doesMeterEntryExists(MeterKey key, String meterName, String containerName) { if (!originalSwMeterView.containsKey(key)) { return false; } for (Entry entry : originalSwMeterView.entrySet()) { if (entry.getValue().getMeterName().equals(meterName)) { if (entry.getValue().getContainerName().equals(containerName)) { return true; } } } return false; }*/ private final class InternalTransaction implements DataCommitTransaction, DataObject> { private final DataModification, DataObject> modification; @Override public DataModification, DataObject> getModification() { return modification; } public InternalTransaction(DataModification, DataObject> modification) { this.modification = modification; } Map, Meter> additions = new HashMap<>(); Map, Meter> updates = new HashMap<>(); Set> removals = new HashSet<>(); /** * We create a plan which flows will be added, which will be updated and * which will be removed based on our internal state. * */ void prepareUpdate() { Set, DataObject>> addMeter = modification.getCreatedConfigurationData().entrySet(); for (Entry, DataObject> entry : addMeter) { if (entry.getValue() instanceof Meter) { Meter meter = (Meter) entry.getValue(); additions.put(entry.getKey(), meter); } } Set, DataObject>> updateMeter = modification.getUpdatedConfigurationData().entrySet(); for (Entry, DataObject> entry : updateMeter) { if (entry.getValue() instanceof Meter) { Meter meter = (Meter) entry.getValue(); ///will be fixed once getUpdatedConfigurationData returns only updated data not created data with it. if (!additions.containsKey(entry.getKey())) { updates.put(entry.getKey(), meter); } } } removals = modification.getRemovedConfigurationData(); } /** * We are OK to go with execution of plan * */ @Override public RpcResult finish() throws IllegalStateException { RpcResult rpcStatus = commitToPlugin(this); // We return true if internal transaction is successful. // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return rpcStatus; } /** * * We should rollback our preparation * */ @Override public RpcResult rollback() throws IllegalStateException { // NOOP - we did not modified any internal state during // requestCommit phase // return Rpcs.getRpcResult(true, null, Collections.emptySet()); return Rpcs.getRpcResult(true, null, Collections.emptySet()); } } private RpcResult commitToPlugin(InternalTransaction transaction) { for (Entry, Meter> entry : transaction.additions.entrySet()) { if (!addMeter(entry.getKey(), entry.getValue()).isSuccess()) { return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } for (Entry, Meter> entry : transaction.updates.entrySet()) { if (!updateMeter(entry.getKey(), entry.getValue()).isSuccess()) { return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } for (InstanceIdentifier meterId : transaction.removals) { DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(meterId); if(removeValue instanceof Meter) { if(!removeMeter(meterId, (Meter)removeValue).isSuccess()) { return Rpcs.getRpcResult(false, null, Collections.emptySet()); } } } return Rpcs.getRpcResult(true, null, Collections.emptySet()); } private final class MeterDataCommitHandler implements DataCommitHandler, DataObject> { @Override public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction, DataObject> requestCommit( DataModification, DataObject> modification) { // We should verify transaction System.out.println("Coming in MeterDataCommitHandler"); InternalTransaction transaction = new InternalTransaction(modification); transaction.prepareUpdate(); return transaction; } } final class MeterEventListener implements SalMeterListener { List addedMeter = new ArrayList<>(); List removeMeter = new ArrayList<>(); List updatedMeter = new ArrayList<>(); @Override public void onMeterAdded(MeterAdded notification) { // TODO Auto-generated method stub } @Override public void onMeterRemoved(MeterRemoved notification) { // TODO Auto-generated method stub } @Override public void onMeterUpdated(MeterUpdated notification) { // TODO Auto-generated method stub } } @Override public List get() { List orderedList = new ArrayList(); Collection meterList = originalSwMeterView.values(); for (Iterator iterator = meterList.iterator(); iterator.hasNext();) { orderedList.add(iterator.next()); } return orderedList; } @Override public DataObject getWithName(String name, Node n) { if (this instanceof MeterConsumerImpl) { Collection meterList = originalSwMeterView.values(); for (Iterator iterator = meterList.iterator(); iterator.hasNext();) { Meter meter = iterator.next(); if (meter.getNode().equals(n) && meter.getMeterName().equals(name)) { return meter; } } } return null; } }