Flow removed switch event. Group and Meter update RPC
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / MeterConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Set;
10 import java.util.Map.Entry;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.core.Node;
24 import org.opendaylight.controller.sal.utils.GlobalConstants;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.Meters;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.MeterKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemoved;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterListener;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.BandType;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Drop;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.DscpRemark;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.band.type.Experimenter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.Meter;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class MeterConsumerImpl {
51     protected static final Logger logger = LoggerFactory.getLogger(MeterConsumerImpl.class);
52     private MeterEventListener meterEventListener = new MeterEventListener();
53     private Registration<NotificationListener> meterListener;
54     private SalMeterService meterService;
55     private MeterDataCommitHandler commitHandler;
56
57     private ConcurrentMap<MeterKey, Meter> originalSwMeterView;
58     private ConcurrentMap<MeterKey, Meter> installedSwMeterView;
59
60     private ConcurrentMap<Node, List<Meter>> nodeMeters;
61     private ConcurrentMap<MeterKey, Meter> inactiveMeters;
62
63     private IClusterContainerServices clusterMeterContainerService = null;
64     private IContainer container;
65
66     public MeterConsumerImpl() {
67         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Meters.class).node(Meter.class).toInstance();
68         meterService = FRMConsumerImpl.getProviderSession().getRpcService(SalMeterService.class);        
69         clusterMeterContainerService = FRMConsumerImpl.getClusterContainerService();
70
71         container = FRMConsumerImpl.getContainer();
72
73         if (!(cacheStartup())) {
74             logger.error("Unable to allocate/retrieve meter cache");
75             System.out.println("Unable to allocate/retrieve meter cache");
76         }
77
78         if (null == meterService) {
79             logger.error("Consumer SAL Meter Service is down or NULL. FRM may not function as intended");
80             System.out.println("Consumer SAL Meter Service is down or NULL.");
81             return;
82         }
83
84         // For switch/plugin events
85         meterListener = FRMConsumerImpl.getNotificationService().registerNotificationListener(meterEventListener);
86
87         if (null == meterListener) {
88             logger.error("Listener to listen on meter data modifcation events");
89             System.out.println("Listener to listen on meter data modifcation events.");
90             return;
91         }
92
93         commitHandler = new MeterDataCommitHandler();
94         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
95     }
96
97     private boolean allocateMeterCaches() {
98         if (this.clusterMeterContainerService == null) {
99             logger.warn("Meter: Un-initialized clusterMeterContainerService, can't create cache");
100             return false;
101         }
102
103         try {
104             clusterMeterContainerService.createCache("frm.originalSwMeterView",
105                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
106
107             clusterMeterContainerService.createCache("frm.installedSwMeterView",
108                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
109
110             clusterMeterContainerService.createCache("frm.inactiveMeters",
111                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
112
113             clusterMeterContainerService.createCache("frm.nodeMeters",
114                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
115
116             // TODO for cluster mode
117             /*
118              * clusterMeterContainerService.createCache(WORK_STATUS_CACHE,
119              * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
120              * IClusterServices.cacheMode.ASYNC));
121              *
122              * clusterMeterContainerService.createCache(WORK_ORDER_CACHE,
123              * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
124              * IClusterServices.cacheMode.ASYNC));
125              */
126
127         } catch (CacheConfigException cce) {
128             logger.error("Meter CacheConfigException");
129             return false;
130
131         } catch (CacheExistException cce) {
132             logger.error(" Meter CacheExistException");
133         }
134
135         return true;
136     }
137
138     private void nonClusterMeterObjectCreate() {
139         originalSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
140         installedSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
141         nodeMeters = new ConcurrentHashMap<Node, List<Meter>>();
142         inactiveMeters = new ConcurrentHashMap<MeterKey, Meter>();
143     }
144
145     @SuppressWarnings({ "unchecked" })
146     private boolean retrieveMeterCaches() {
147         ConcurrentMap<?, ?> map;
148
149         if (this.clusterMeterContainerService == null) {
150             logger.warn("Meter: un-initialized clusterMeterContainerService, can't retrieve cache");
151             nonClusterMeterObjectCreate();
152             return false;
153         }
154
155         map = clusterMeterContainerService.getCache("frm.originalSwMeterView");
156         if (map != null) {
157             originalSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
158         } else {
159             logger.error("Retrieval of cache(originalSwMeterView) failed");
160             return false;
161         }
162
163         map = clusterMeterContainerService.getCache("frm.installedSwMeterView");
164         if (map != null) {
165             installedSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
166         } else {
167             logger.error("Retrieval of cache(installedSwMeterView) failed");
168             return false;
169         }
170
171         map = clusterMeterContainerService.getCache("frm.inactiveMeters");
172         if (map != null) {
173             inactiveMeters = (ConcurrentMap<MeterKey, Meter>) map;
174         } else {
175             logger.error("Retrieval of cache(inactiveMeters) failed");
176             return false;
177         }
178
179         map = clusterMeterContainerService.getCache("frm.nodeMeters");
180         if (map != null) {
181             nodeMeters = (ConcurrentMap<Node, List<Meter>>) map;
182         } else {
183             logger.error("Retrieval of cache(nodeMeter) failed");
184             return false;
185         }
186
187         return true;
188     }
189
190     private boolean cacheStartup() {
191         if (allocateMeterCaches()) {
192             if (retrieveMeterCaches()) {
193                 return true;
194             }
195         }
196
197         return false;
198     }
199
200     /**
201      * Adds Meter to the southbound plugin and our internal database
202      *
203      * @param path
204      * @param dataObject
205      */
206     private Status addMeter(InstanceIdentifier<?> path, Meter meterAddDataObject) {
207         MeterKey meterKey = meterAddDataObject.getKey();
208
209         if (null != meterKey && validateMeter(meterAddDataObject, FRMUtil.operation.ADD).isSuccess()) {
210             if (meterAddDataObject.isInstall()) {
211                 AddMeterInputBuilder meterBuilder = new AddMeterInputBuilder();
212
213                 meterBuilder.setContainerName(meterAddDataObject.getContainerName());
214                 meterBuilder.setFlags(meterAddDataObject.getFlags());
215                 meterBuilder.setMeterBandHeaders(meterAddDataObject.getMeterBandHeaders());
216                 meterBuilder.setMeterId(meterAddDataObject.getMeterId());
217                 meterBuilder.setNode(meterAddDataObject.getNode());
218                 originalSwMeterView.put(meterKey, meterAddDataObject);
219                 meterService.addMeter(meterBuilder.build());
220             }
221             
222             originalSwMeterView.put(meterKey, meterAddDataObject);            
223         }
224         else {
225             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
226         }
227
228         return new Status(StatusCode.SUCCESS);
229     }
230
231     /*
232      * Update Meter to the southbound plugin and our internal database
233      *
234      * @param path
235      *
236      * @param dataObject
237      */
238     private Status updateMeter(InstanceIdentifier<?> path, Meter meterUpdateDataObject) {        
239         MeterKey meterKey = meterUpdateDataObject.getKey();
240         UpdatedMeterBuilder updateMeterBuilder = null;
241         
242         
243         if (null != meterKey && 
244                 validateMeter(meterUpdateDataObject, FRMUtil.operation.UPDATE).isSuccess()) {
245             
246             if (originalSwMeterView.containsKey(meterKey)) {
247                 originalSwMeterView.remove(meterKey);
248                 originalSwMeterView.put(meterKey, meterUpdateDataObject);
249             }
250             
251             if (meterUpdateDataObject.isInstall()) {
252                 UpdateMeterInputBuilder updateMeterInputBuilder = new UpdateMeterInputBuilder(); 
253                 updateMeterBuilder = new UpdatedMeterBuilder();
254                 updateMeterBuilder.fieldsFrom(meterUpdateDataObject);
255                 updateMeterInputBuilder.setUpdatedMeter(updateMeterBuilder.build());
256                 
257                 if (installedSwMeterView.containsKey(meterKey)) {
258                     installedSwMeterView.remove(meterKey);
259                     installedSwMeterView.put(meterKey, meterUpdateDataObject);
260                 }
261                 
262                 meterService.updateMeter(updateMeterInputBuilder.build());
263             }
264                         
265         }
266         else {
267             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
268         }
269
270         return new Status(StatusCode.SUCCESS);
271     }
272
273     /*
274      * Remove Meter to the southbound plugin and our internal database
275      *
276      * @param path
277      *
278      * @param dataObject
279      */
280     private Status RemoveMeter(InstanceIdentifier<?> path, Meter meterUpdateDataObject) {        
281         MeterKey meterKey = meterUpdateDataObject.getKey();
282         
283         if (null != meterKey && 
284                 validateMeter(meterUpdateDataObject, FRMUtil.operation.ADD).isSuccess()) {
285             if (meterUpdateDataObject.isInstall()) {
286                 UpdateMeterInputBuilder updateMeterBuilder = new UpdateMeterInputBuilder();                
287                 
288                 installedSwMeterView.put(meterKey, meterUpdateDataObject);
289                 meterService.updateMeter(updateMeterBuilder.build());
290             }
291             
292             originalSwMeterView.put(meterKey, meterUpdateDataObject);            
293         }
294         else {
295             return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
296         }
297
298         return new Status(StatusCode.SUCCESS);
299     }
300
301     public Status validateMeter(Meter meter, FRMUtil.operation operation) {
302         String containerName;
303         String meterName;
304         Status returnStatus = null;
305         boolean returnResult;
306
307         if (null != meter) {
308             containerName = meter.getContainerName();
309
310             if (null == containerName) {
311                 containerName = GlobalConstants.DEFAULT.toString();
312             } else if (!FRMUtil.isNameValid(containerName)) {
313                 logger.error("Container Name is invalid %s" + containerName);
314                 returnStatus = new Status(StatusCode.BADREQUEST, "Container Name is invalid");
315                 return returnStatus;
316             }
317
318             meterName = meter.getMeterName();
319             if (!FRMUtil.isNameValid(meterName)) {
320                 logger.error("Meter Name is invalid %s" + meterName);
321                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter Name is invalid");
322                 return returnStatus;
323             }
324
325             returnResult = doesMeterEntryExists(meter.getKey(), meterName, containerName);
326
327             if (FRMUtil.operation.ADD == operation && returnResult) {
328                 logger.error("Record with same Meter Name exists");
329                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter record exists");
330                 return returnStatus;
331             } else if (!returnResult) {
332                 logger.error("Group record does not exist");
333                 returnStatus = new Status(StatusCode.BADREQUEST, "Meter record does not exist");
334                 return returnStatus;
335             }
336
337             for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
338                 if (!meter.getFlags().isMeterBurst()) {
339                     if (0 < meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBurstSize()) {
340                         logger.error("Burst size should only be associated when Burst FLAG is set");
341                         returnStatus = new Status(StatusCode.BADREQUEST,
342                                 "Burst size should only be associated when Burst FLAG is set");
343                         break;
344                     }
345                 }
346             }
347
348             if (null != returnStatus && !returnStatus.isSuccess()) {
349                 return returnStatus;
350             } else {
351                 BandType setBandType = null;
352                 DscpRemark dscpRemark = null;
353                 for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
354                     setBandType = meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBandType();
355                     if (setBandType instanceof DscpRemark) {
356                         dscpRemark = (DscpRemark) setBandType;
357                         if (0 > dscpRemark.getRate()) {
358
359                         }
360                     } else if (setBandType instanceof Drop) {
361                         if (0 < dscpRemark.getPercLevel()) {
362                             logger.error("Number of drop Precedence level");
363                         }
364                     } else if (setBandType instanceof Experimenter) {
365
366                     }
367                 }
368             }
369         }
370         return new Status(StatusCode.SUCCESS);
371     }
372
373     private boolean doesMeterEntryExists(MeterKey key, String meterName, String containerName) {
374         if (!originalSwMeterView.containsKey(key)) {
375             return false;
376         }
377
378         for (Entry<MeterKey, Meter> entry : originalSwMeterView.entrySet()) {
379             if (entry.getValue().getMeterName().equals(meterName)) {
380                 if (entry.getValue().getContainerName().equals(containerName)) {
381                     return true;
382                 }
383             }
384         }
385         return false;
386     }
387
388     private RpcResult<Void> commitToPlugin(internalTransaction transaction) {
389         for (Entry<InstanceIdentifier<?>, Meter> entry : transaction.additions.entrySet()) {
390
391             if (!addMeter(entry.getKey(), entry.getValue()).isSuccess()) {
392                 return Rpcs.getRpcResult(false, null, null);
393             }
394         }
395         for (@SuppressWarnings("unused")
396         Entry<InstanceIdentifier<?>, Meter> entry : transaction.updates.entrySet()) {
397
398             if (!updateMeter(entry.getKey(), entry.getValue()).isSuccess()) {
399                 return Rpcs.getRpcResult(false, null, null);
400             }
401         }
402
403         for (InstanceIdentifier<?> removal : transaction.removals) {
404             /*
405              * if (!removeMeter(entry.getKey(),entry.getValue()).isSuccess()) {
406              * return Rpcs.getRpcResult(false, null, null); }
407              */
408         }
409
410         return Rpcs.getRpcResult(true, null, null);
411     }
412
413     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
414
415         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
416
417         @Override
418         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
419             return modification;
420         }
421
422         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
423             this.modification = modification;
424         }
425
426         Map<InstanceIdentifier<?>, Meter> additions = new HashMap<>();
427         Map<InstanceIdentifier<?>, Meter> updates = new HashMap<>();
428         Set<InstanceIdentifier<?>> removals = new HashSet<>();
429
430         /**
431          * We create a plan which flows will be added, which will be updated and
432          * which will be removed based on our internal state.
433          *
434          */
435         void prepareUpdate() {
436
437             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
438             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
439                 if (entry.getValue() instanceof Meter) {
440                     Meter Meter = (Meter) entry.getValue();
441                     preparePutEntry(entry.getKey(), Meter);
442                 }
443
444             }
445
446             removals = modification.getRemovedConfigurationData();
447         }
448
449         private void preparePutEntry(InstanceIdentifier<?> key, Meter meter) {
450
451             Meter original = originalSwMeterView.get(key);
452             if (original != null) {
453                 // It is update for us
454
455                 updates.put(key, meter);
456             } else {
457                 // It is addition for us
458
459                 additions.put(key, meter);
460             }
461         }
462
463         /**
464          * We are OK to go with execution of plan
465          *
466          */
467         @Override
468         public RpcResult<Void> finish() throws IllegalStateException {
469
470             RpcResult<Void> rpcStatus = commitToPlugin(this);
471             // We return true if internal transaction is successful.
472             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
473             return rpcStatus;
474         }
475
476         /**
477          *
478          * We should rollback our preparation
479          *
480          */
481         @Override
482         public RpcResult<Void> rollback() throws IllegalStateException {
483             // NOOP - we did not modified any internal state during
484             // requestCommit phase
485             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
486             return Rpcs.getRpcResult(true, null, null);
487
488         }
489
490     }
491
492     private final class MeterDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
493         @Override
494         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(
495                 DataModification<InstanceIdentifier<?>, DataObject> modification) {
496             // We should verify transaction
497             System.out.println("Coming in MeterDataCommitHandler");
498             internalTransaction transaction = new internalTransaction(modification);
499             transaction.prepareUpdate();
500             return transaction;
501         }
502     }
503
504     final class MeterEventListener implements SalMeterListener {
505
506         List<MeterAdded> addedMeter = new ArrayList<>();
507         List<MeterRemoved> removeMeter = new ArrayList<>();
508         List<MeterUpdated> updatedMeter = new ArrayList<>();
509
510         @Override
511         public void onMeterAdded(MeterAdded notification) {
512             // TODO Auto-generated method stub
513
514         }
515
516         @Override
517         public void onMeterRemoved(MeterRemoved notification) {
518             // TODO Auto-generated method stub
519
520         }
521
522         @Override
523         public void onMeterUpdated(MeterUpdated notification) {
524             // TODO Auto-generated method stub
525
526         }
527     }
528 }