Added meter, action, group, flow models, mask and transactions support.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / MeterConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Set;
10 import java.util.Map.Entry;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.core.Node;
24 import org.opendaylight.controller.sal.utils.GlobalConstants;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.Meters;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemoved;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterListener;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.BandType;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Drop;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.DscpRemark;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Experimenter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.Meter;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class MeterConsumerImpl {
51     protected static final Logger logger = LoggerFactory.getLogger(MeterConsumerImpl.class);
52     private MeterEventListener meterEventListener = new MeterEventListener();
53     private Registration<NotificationListener> meterListener;
54     private SalMeterService meterService;    
55     private MeterDataCommitHandler commitHandler;
56     
57     private ConcurrentMap<MeterKey, Meter> originalSwMeterView;
58     private ConcurrentMap<MeterKey, Meter> installedSwMeterView;
59     
60     private ConcurrentMap<Node, List<Meter>> nodeMeters;
61     private ConcurrentMap<MeterKey, Meter> inactiveMeters;
62     
63     private IClusterContainerServices clusterMeterContainerService = null; 
64     private IContainer container;
65     
66     public MeterConsumerImpl() {
67         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Meters.class).node(Meter.class).toInstance();
68         meterService = FRMConsumerImpl.getProviderSession().getRpcService(SalMeterService.class);        
69         clusterMeterContainerService = FRMConsumerImpl.getClusterContainerService();
70         
71         container = FRMConsumerImpl.getContainer();
72         
73         if (!(cacheStartup())) {
74             logger.error("Unable to allocate/retrieve meter cache");
75             System.out.println("Unable to allocate/retrieve meter cache");
76         }
77         
78         if (null == meterService) {
79             logger.error("Consumer SAL Meter Service is down or NULL. FRM may not function as intended");
80             System.out.println("Consumer SAL Meter Service is down or NULL.");
81             return;
82         } 
83         
84         // For switch/plugin events
85         meterListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(meterEventListener);
86         
87         if (null == meterListener) {
88             logger.error("Listener to listen on meter data modifcation events");
89             System.out.println("Listener to listen on meter data modifcation events.");
90             return;
91         }       
92         
93         commitHandler = new MeterDataCommitHandler();
94         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
95     }
96     
97     
98     
99     private boolean allocateMeterCaches() {
100         if (this.clusterMeterContainerService == null) {
101             logger.warn("Meter: Un-initialized clusterMeterContainerService, can't create cache");
102             return false;
103         }       
104
105         try {
106             clusterMeterContainerService.createCache("frm.originalSwMeterView",
107                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
108
109             clusterMeterContainerService.createCache("frm.installedSwMeterView",
110                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
111
112             clusterMeterContainerService.createCache("frm.inactiveMeters",
113                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114
115             clusterMeterContainerService.createCache("frm.nodeMeters",
116                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117             
118 //TODO for cluster mode
119            /* clusterMeterContainerService.createCache(WORK_STATUS_CACHE,
120                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
121
122             clusterMeterContainerService.createCache(WORK_ORDER_CACHE,
123                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));*/
124             
125         } catch (CacheConfigException cce) {            
126             logger.error("Meter CacheConfigException");
127             return false;
128             
129         } catch (CacheExistException cce) {
130             logger.error(" Meter CacheExistException");           
131         }
132         
133         return true;
134     }
135     
136     private void nonClusterMeterObjectCreate() {
137         originalSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
138         installedSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
139         nodeMeters = new ConcurrentHashMap<Node, List<Meter>>();        
140         inactiveMeters = new ConcurrentHashMap<MeterKey, Meter>();
141     }
142     
143     @SuppressWarnings({ "unchecked" })
144     private boolean retrieveMeterCaches() {
145         ConcurrentMap<?, ?> map;
146
147         if (this.clusterMeterContainerService == null) {
148             logger.warn("Meter: un-initialized clusterMeterContainerService, can't retrieve cache");
149             nonClusterMeterObjectCreate();
150             return false;
151         }       
152
153         map = clusterMeterContainerService.getCache("frm.originalSwMeterView");
154         if (map != null) {
155             originalSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
156         } else {
157             logger.error("Retrieval of cache(originalSwMeterView) failed");
158             return false;
159         }
160
161         map = clusterMeterContainerService.getCache("frm.installedSwMeterView");
162         if (map != null) {
163             installedSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
164         } else {
165             logger.error("Retrieval of cache(installedSwMeterView) failed");
166             return false;
167         }
168
169         map = clusterMeterContainerService.getCache("frm.inactiveMeters");
170         if (map != null) {
171             inactiveMeters = (ConcurrentMap<MeterKey, Meter>) map;
172         } else {
173             logger.error("Retrieval of cache(inactiveMeters) failed");
174             return false;
175         }
176
177         map = clusterMeterContainerService.getCache("frm.nodeMeters");
178         if (map != null) {
179             nodeMeters = (ConcurrentMap<Node, List<Meter>>) map;
180         } else {
181             logger.error("Retrieval of cache(nodeMeter) failed");
182             return false;
183         }
184         
185         return true;
186     }
187     
188     private boolean cacheStartup() {
189         if (allocateMeterCaches()) {
190             if (retrieveMeterCaches()) {
191                 return true;
192             }
193         }
194         
195         return false;
196     }
197     
198     /**
199      * Adds Meter to the southbound plugin and our internal database
200      *
201      * @param path
202      * @param dataObject
203      */
204     private Status addMeter(InstanceIdentifier<?> path, Meter meterAddDataObject) {        
205         MeterKey meterKey = meterAddDataObject.getKey();
206         
207         if (null != meterKey && 
208                 validateMeter(meterAddDataObject, FRMUtil.operation.ADD).isSuccess()) {
209             if (meterAddDataObject.isInstall()) {
210                 AddMeterInputBuilder meterBuilder = new AddMeterInputBuilder();
211                 
212                 meterBuilder.setContainerName(meterAddDataObject.getContainerName());
213                 meterBuilder.setFlags(meterAddDataObject.getFlags());
214                 meterBuilder.setMeterBandHeaders(meterAddDataObject.getMeterBandHeaders());
215                 meterBuilder.setMeterId(meterAddDataObject.getMeterId());
216                 meterBuilder.setNode(meterAddDataObject.getNode());
217                 originalSwMeterView.put(meterKey, meterAddDataObject);
218                 meterService.addMeter(meterBuilder.build());
219             }
220             
221             originalSwMeterView.put(meterKey, meterAddDataObject);            
222         }
223         else {
224             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
225         }
226       
227         return new Status(StatusCode.SUCCESS);
228     }
229     
230     /*
231      * Update Meter to the southbound plugin and our internal database
232      *
233      * @param path
234      * @param dataObject
235      */
236     private Status updateMeter(InstanceIdentifier<?> path, Meter meterUpdateDataObject) {        
237         MeterKey meterKey = meterUpdateDataObject.getKey();
238         
239         if (null != meterKey && 
240                 validateMeter(meterUpdateDataObject, FRMUtil.operation.ADD).isSuccess()) {
241             if (meterUpdateDataObject.isInstall()) {
242                 UpdateMeterInputBuilder updateMeterBuilder = new UpdateMeterInputBuilder();  
243                 
244                 originalSwMeterView.put(meterKey, meterUpdateDataObject);
245                 meterService.updateMeter(updateMeterBuilder.build());
246             }
247             
248             originalSwMeterView.put(meterKey, meterUpdateDataObject);            
249         }
250         else {
251             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
252         }
253       
254         return new Status(StatusCode.SUCCESS);
255     }
256     
257     /*
258      * Remove Meter to the southbound plugin and our internal database
259      *
260      * @param path
261      * @param dataObject
262      */
263     private Status RemoveMeter(InstanceIdentifier<?> path, Meter meterUpdateDataObject) {        
264         MeterKey meterKey = meterUpdateDataObject.getKey();
265         
266         if (null != meterKey && 
267                 validateMeter(meterUpdateDataObject, FRMUtil.operation.ADD).isSuccess()) {
268             if (meterUpdateDataObject.isInstall()) {
269                 UpdateMeterInputBuilder updateMeterBuilder = new UpdateMeterInputBuilder();                
270                 
271                 originalSwMeterView.put(meterKey, meterUpdateDataObject);
272                 meterService.updateMeter(updateMeterBuilder.build());
273             }
274             
275             originalSwMeterView.put(meterKey, meterUpdateDataObject);            
276         }
277         else {
278             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
279         }
280       
281         return new Status(StatusCode.SUCCESS);
282     }
283     
284     public Status validateMeter(Meter meter, FRMUtil.operation operation) {
285         String containerName;
286         String meterName;
287         Status returnStatus = null;
288         boolean returnResult;
289         
290         if (null != meter) {
291             containerName = meter.getContainerName();
292             
293             if (null == containerName) {
294                 containerName = GlobalConstants.DEFAULT.toString();
295             }
296             else if (!FRMUtil.isNameValid(containerName)) {
297                 logger.error("Container Name is invalid %s" + containerName);
298                 returnStatus = new Status(StatusCode.BADREQUEST, "Container Name is invalid");
299                 return returnStatus;
300             }
301             
302             meterName = meter.getMeterName();
303             if (!FRMUtil.isNameValid(meterName)) {
304                 logger.error("Meter Name is invalid %s" + meterName);
305                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter Name is invalid");
306                 return returnStatus;
307             }
308             
309             returnResult = doesMeterEntryExists(meter.getKey(), meterName, containerName);
310             
311             if (FRMUtil.operation.ADD == operation && returnResult) {
312                 logger.error("Record with same Meter Name exists");
313                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter record exists");
314                 return returnStatus;
315             }
316             else if (!returnResult) {
317                 logger.error("Group record does not exist");
318                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter record does not exist");
319                 return returnStatus;
320             }
321           
322             for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
323                 if (!meter.getFlags().isMeterBurst()) {
324                     if (0 < meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBurstSize()) {
325                         logger.error("Burst size should only be associated when Burst FLAG is set");
326                         returnStatus = new Status(StatusCode.BADREQUEST, "Burst size should only be associated when Burst FLAG is set");
327                         break;
328                     }
329                 }
330             }
331             
332             if (null != returnStatus && !returnStatus.isSuccess()) {
333                 return returnStatus;
334             }
335             else {
336                 BandType setBandType = null;
337                 DscpRemark dscpRemark = null;
338                 for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
339                     setBandType = meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBandType();
340                     if ( setBandType instanceof DscpRemark) {   
341                         dscpRemark = (DscpRemark)setBandType;
342                         if (0 > dscpRemark.getRate()) {
343                            
344                         }
345                     }
346                     else if (setBandType instanceof Drop) {
347                         if (0 < dscpRemark.getPercLevel()) {
348                             logger.error("Number of drop Precedence level");
349                         }                        
350                     }
351                     else if (setBandType instanceof Experimenter) {
352                         
353                     }
354                 }                
355             }
356         }
357         return new Status(StatusCode.SUCCESS);
358     }
359     
360     private boolean doesMeterEntryExists(MeterKey key, String meterName, String containerName) {
361         if (! originalSwMeterView.containsKey(key)) {
362             return false;
363         }
364         
365         for (Entry<MeterKey, Meter> entry : originalSwMeterView.entrySet()) {
366             if (entry.getValue().getMeterName().equals(meterName)) {
367                 if (entry.getValue().getContainerName().equals(containerName)) {
368                     return true;
369                 }
370             }
371         }
372         return false;
373     }
374
375     
376     private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
377         for(Entry<InstanceIdentifier<?>, Meter> entry :transaction.additions.entrySet()) {
378             
379             if (!addMeter(entry.getKey(),entry.getValue()).isSuccess()) {
380                 return Rpcs.getRpcResult(false, null, null);
381             }
382         }
383         for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Meter> entry :transaction.updates.entrySet()) {
384            
385             if (!updateMeter(entry.getKey(),entry.getValue()).isSuccess()) {
386                 return Rpcs.getRpcResult(false, null, null);
387             }
388         }
389         
390         for(InstanceIdentifier<?> removal : transaction.removals) {
391            /* if (!removeMeter(entry.getKey(),entry.getValue()).isSuccess()) {
392                 return Rpcs.getRpcResult(false, null, null);
393             }*/
394         }
395         
396         return Rpcs.getRpcResult(true, null, null);
397     }
398     
399     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
400
401         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
402
403         @Override
404         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
405             return modification;
406         }
407
408         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
409             this.modification = modification;
410         }
411
412         Map<InstanceIdentifier<?>, Meter> additions = new HashMap<>();
413         Map<InstanceIdentifier<?>, Meter> updates = new HashMap<>();
414         Set<InstanceIdentifier<?>> removals = new HashSet<>();
415
416         /**
417          * We create a plan which flows will be added, which will be updated and
418          * which will be removed based on our internal state.
419          * 
420          */
421         void prepareUpdate() {
422
423             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
424             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
425                 if (entry.getValue() instanceof Meter) {                    
426                     Meter Meter = (Meter) entry.getValue();                    
427                     preparePutEntry(entry.getKey(), Meter);
428                 }
429
430             }
431
432             removals = modification.getRemovedConfigurationData();
433         }
434
435         private void preparePutEntry(InstanceIdentifier<?> key, Meter meter) {
436             
437             Meter original = originalSwMeterView.get(key);
438             if (original != null) {
439                 // It is update for us
440                 
441                 updates.put(key, meter);               
442             } else {
443                 // It is addition for us
444                 
445                 additions.put(key, meter);
446             }
447         }
448
449         /**
450          * We are OK to go with execution of plan
451          * 
452          */
453         @Override
454         public RpcResult<Void> finish() throws IllegalStateException {
455             
456             RpcResult<Void> rpcStatus = commitToPlugin(this);
457             // We return true if internal transaction is successful.
458           //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
459             return rpcStatus;
460         }
461
462         /**
463          * 
464          * We should rollback our preparation
465          * 
466          */
467         @Override
468         public RpcResult<Void> rollback() throws IllegalStateException {
469             // NOOP - we did not modified any internal state during
470             // requestCommit phase
471            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
472             return Rpcs.getRpcResult(true, null, null);
473             
474         }
475         
476     }
477
478     private final class MeterDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
479         @Override
480         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(
481                 DataModification<InstanceIdentifier<?>, DataObject> modification) {
482             // We should verify transaction
483             System.out.println("Coming in MeterDataCommitHandler");
484             internalTransaction transaction = new internalTransaction(modification);
485             transaction.prepareUpdate();
486             return transaction;
487         }
488     }
489     
490     final class MeterEventListener implements SalMeterListener {
491         
492         List<MeterAdded> addedMeter = new ArrayList<>();
493         List<MeterRemoved> removeMeter = new ArrayList<>();
494         List<MeterUpdated> updatedMeter = new ArrayList<>();
495
496         @Override
497         public void onMeterAdded(MeterAdded notification) {
498             // TODO Auto-generated method stub
499             
500         }
501
502         @Override
503         public void onMeterRemoved(MeterRemoved notification) {
504             // TODO Auto-generated method stub
505             
506         }
507
508         @Override
509         public void onMeterUpdated(MeterUpdated notification) {
510             // TODO Auto-generated method stub
511             
512         }    
513     }
514 }