Switch to using yangtools version of mockito-configuration
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Map.Entry;
8 import java.util.Set;
9
10 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
11 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
12 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
13 import org.opendaylight.controller.sal.common.util.Rpcs;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.Tables;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.Table;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.TableBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.TableKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableRef;
36 import org.opendaylight.yangtools.concepts.Registration;
37 import org.opendaylight.yangtools.yang.binding.DataObject;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.binding.NotificationListener;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public class FlowConsumerImpl {
46     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
47     private final FlowEventListener flowEventListener = new FlowEventListener();
48     private Registration<NotificationListener> listener1Reg;
49     private SalFlowService flowService;
50     // private FlowDataListener listener;
51     private FlowDataCommitHandler commitHandler;    
52
53     public FlowConsumerImpl() {
54         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
55         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
56
57         if (null == flowService) {
58             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
59             return;
60         }
61         
62         // For switch events
63         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
64
65         if (null == listener1Reg) {
66             logger.error("Listener to listen on flow data modifcation events");
67             return;
68         }
69         // addFlowTest();
70         commitHandler = new FlowDataCommitHandler();
71         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);        
72     }
73     
74     /**
75      * Adds flow to the southbound plugin and our internal database
76      *
77      * @param path
78      * @param dataObject
79      */
80     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
81
82         AddFlowInputBuilder input = new AddFlowInputBuilder();
83         input.fieldsFrom(dataObject);
84         input.setNode((dataObject).getNode());
85         input.setFlowTable(new FlowTableRef(createTableInstance(dataObject.getId(), dataObject.getNode())));
86         // We send flow to the sounthbound plugin
87         flowService.addFlow(input.build());
88     }
89
90     /**
91      * Removes flow to the southbound plugin and our internal database
92      *
93      * @param path
94      * @param dataObject
95      */
96     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
97         
98         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
99         input.fieldsFrom(dataObject);
100         input.setNode((dataObject).getNode());
101         input.setTableId(dataObject.getTableId());
102         input.setFlowTable(new FlowTableRef(createTableInstance((long)dataObject.getTableId(), (dataObject).getNode())));
103         // We send flow to the sounthbound plugin
104         flowService.removeFlow(input.build());
105     }
106
107     /**
108      * Update flow to the southbound plugin and our internal database
109      *
110      * @param path
111      * @param dataObject
112      */
113     private void updateFlow(InstanceIdentifier<?> path, Flow updatedFlow, Flow originalFlow) {
114
115         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
116         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
117         updatedflowbuilder.fieldsFrom(updatedFlow);
118         input.setNode(updatedFlow.getNode());
119         input.setUpdatedFlow(updatedflowbuilder.build());  
120         OriginalFlowBuilder ofb = new OriginalFlowBuilder(originalFlow);
121         input.setOriginalFlow(ofb.build());
122         // We send flow to the sounthbound plugin
123         flowService.updateFlow(input.build());
124     }
125  
126     private void commitToPlugin(internalTransaction transaction) {
127         Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification()
128                 .getCreatedConfigurationData().entrySet();
129
130         /*
131          * This little dance is because updatedEntries contains both created and
132          * modified entries The reason I created a new HashSet is because the
133          * collections we are returned are immutable.
134          */
135         Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
136         updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
137         updatedEntries.removeAll(createdEntries);
138
139         Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification()
140                 .getRemovedConfigurationData();
141         transaction.getModification().getOriginalConfigurationData();
142         for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
143             if (entry.getValue() instanceof Flow) {
144                 logger.debug("Coming add cc in FlowDatacommitHandler");
145                 Flow flow = (Flow) entry.getValue();
146                 boolean status = validate(flow);
147                 if (!status) {
148                     return;
149                 }
150                 addFlow(entry.getKey(), (Flow) entry.getValue());
151             }
152         }
153        
154         for (Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
155             if (entry.getValue() instanceof Flow) {
156                 logger.debug("Coming update cc in FlowDatacommitHandler");
157                 Flow updatedFlow = (Flow) entry.getValue();
158                 Flow originalFlow = (Flow) transaction.modification.getOriginalConfigurationData().get(entry.getKey());
159                 boolean status = validate(updatedFlow);
160                 if (!status) {
161                     return;
162                 }
163                 updateFlow(entry.getKey(), updatedFlow, originalFlow);
164             }
165         }
166
167         for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers) {
168             DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
169             if (removeValue instanceof Flow) {
170                 logger.debug("Coming remove cc in FlowDatacommitHandler");
171                 Flow flow = (Flow) removeValue;
172                 boolean status = validate(flow);
173                 
174                 if (!status) {
175                     return;
176                 }
177                 
178                 removeFlow(instanceId, (Flow) removeValue);
179             }
180         }
181     }
182
183     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {   
184      
185         @SuppressWarnings("unchecked")
186         public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
187             // We should verify transaction
188             logger.debug("Coming in FlowDatacommitHandler");
189             internalTransaction transaction = new internalTransaction(modification);
190             transaction.prepareUpdate();
191             return transaction;
192         }
193     }
194
195     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
196
197         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
198
199         @Override
200         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
201             return modification;
202         }
203
204         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
205             this.modification = modification;
206         }
207         
208         /**
209          * We create a plan which flows will be added, which will be updated and
210          * which will be removed based on our internal state.
211          *
212          */
213         void prepareUpdate() {          
214
215         }
216        
217         /**
218          * We are OK to go with execution of plan
219          *
220          */
221         @Override
222         public RpcResult<Void> finish() throws IllegalStateException {
223             commitToPlugin(this);            
224             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
225         }
226
227         /**
228          *
229          * We should rollback our preparation
230          *
231          */
232         @Override
233         public RpcResult<Void> rollback() throws IllegalStateException {       
234             rollBackFlows(modification);
235             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
236
237         }       
238     }
239
240     private void rollBackFlows(DataModification<InstanceIdentifier<?>, DataObject> modification) {
241      Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> createdEntries = modification.getCreatedConfigurationData().entrySet();
242
243     /*
244      * This little dance is because updatedEntries contains both created and modified entries
245      * The reason I created a new HashSet is because the collections we are returned are immutable.
246      */
247     Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<? extends DataObject>, DataObject>>();
248     updatedEntries.addAll(modification.getUpdatedConfigurationData().entrySet());
249     updatedEntries.removeAll(createdEntries);
250
251     Set<InstanceIdentifier<? >> removeEntriesInstanceIdentifiers = modification.getRemovedConfigurationData();
252     for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
253         if(entry.getValue() instanceof Flow) {
254             removeFlow(entry.getKey(),(Flow) entry.getValue()); // because we are rolling back, remove what we would have added.
255         }
256     }
257     
258     for (Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
259         if(entry.getValue() instanceof Flow) {            
260             Flow updatedFlow = (Flow) entry.getValue();
261             Flow originalFlow = (Flow) modification.getOriginalConfigurationData().get(entry.getKey());
262             updateFlow(entry.getKey(), updatedFlow ,originalFlow);// because we are rolling back, replace the updated with the original
263         }
264     }
265
266     for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
267         DataObject removeValue = (Flow) modification.getOriginalConfigurationData().get(instanceId);
268         if(removeValue instanceof Flow) {
269             addFlow(instanceId,(Flow) removeValue);// because we are rolling back, add what we would have removed.
270
271         }
272     }
273 }
274     final class FlowEventListener implements SalFlowListener {
275
276         List<FlowAdded> addedFlows = new ArrayList<>();
277         List<FlowRemoved> removedFlows = new ArrayList<>();
278         List<FlowUpdated> updatedFlows = new ArrayList<>();
279
280         @Override
281         public void onFlowAdded(FlowAdded notification) {
282             addedFlows.add(notification);
283         }
284
285         @Override
286         public void onFlowRemoved(FlowRemoved notification) {
287             removedFlows.add(notification);
288         }
289
290         @Override
291         public void onFlowUpdated(FlowUpdated notification) {
292             updatedFlows.add(notification);
293         }
294
295         @Override
296         public void onNodeErrorNotification(NodeErrorNotification notification) {
297             // TODO Auto-generated method stub
298
299         }
300
301         @Override
302         public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
303             // TODO Auto-generated method stub
304
305         }
306
307         @Override
308         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
309             // TODO Auto-generated method stub
310
311         }
312     }
313
314     public boolean validate(Flow flow) {
315         String msg = ""; // Specific part of warn/error log
316
317         boolean result = true;
318         // flow Name validation
319         if (!FRMUtil.isNameValid(flow.getFlowName())) {
320             msg = "Invalid Flow name";
321             result = false;
322         }
323         
324         // Node Validation
325         if (result == true && flow.getNode() == null) {
326             msg = "Node is null";
327             result = false;
328         }
329
330         // TODO: Validate we are seeking to program a flow against a valid
331         // Node
332
333         if (result == true && flow.getPriority() != null) {
334             if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
335                 msg = String.format("priority %s is not in the range 0 - 65535", flow.getPriority());
336                 result = false;
337             }
338         }
339        
340         if (!FRMUtil.validateMatch(flow)) {
341             logger.error("Not a valid Match");
342             result = false;
343         }
344         if (!FRMUtil.validateInstructions(flow)) {
345             logger.error("Not a valid Instruction");
346             result = false;
347         }
348         if (result == false) {
349             logger.warn("Invalid Configuration for flow {}. The failure is {}", flow, msg);
350             logger.error("Invalid Configuration ({})", msg);
351         }
352         return result;
353     }
354     
355     private InstanceIdentifier<?> createTableInstance(Long tableId, NodeRef nodeRef) {        
356         Table table;
357         InstanceIdentifier<Table> tableInstance;
358         TableBuilder builder = new TableBuilder();
359         builder.setId(tableId);
360         builder.setKey(new TableKey(tableId, nodeRef));
361         table = builder.build();
362         tableInstance = InstanceIdentifier.builder(Tables.class).child(Table.class, table.getKey()).toInstance();
363         return tableInstance;
364     }
365 }