Merge "creating a default subnet"
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Map.Entry;
10 import java.util.Set;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
15 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
18 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
19 import org.opendaylight.controller.sal.common.util.Rpcs;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
23
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.opendaylight.yangtools.yang.binding.NotificationListener;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class FlowConsumerImpl {
47         protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
48         private FlowEventListener flowEventListener = new FlowEventListener();
49     private Registration<NotificationListener> listener1Reg;
50         private SalFlowService flowService;
51         private FlowDataListener listener;
52         private FlowDataCommitHandler commitHandler;
53         private ConcurrentHashMap<FlowKey, Flow> originalSwView;
54         
55     public FlowConsumerImpl() {         
56                 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
57         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
58                 
59                 if (null == flowService) {
60                         logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
61                 System.out.println("Consumer SAL Service is down or NULL.");
62                 return;
63                 }
64                 
65                 listener = new FlowDataListener();
66                 
67                 if (null == FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path, listener)) {
68                         logger.error("Failed to listen on flow data modifcation events");
69                 System.out.println("Consumer SAL Service is down or NULL.");
70                 return;
71                 }       
72                         
73                 // For switch events
74                 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
75                 
76                 if (null == listener1Reg) {
77                         logger.error("Listener to listen on flow data modifcation events");
78                 System.out.println("Consumer SAL Service is down or NULL.");
79                 return;
80                 }
81                 //addFlowTest();
82                 System.out.println("-------------------------------------------------------------------");
83                 allocateCaches();
84                 commitHandler = new FlowDataCommitHandler();
85                 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
86     }
87     
88     private void allocateCaches() {
89         originalSwView = new ConcurrentHashMap<FlowKey, Flow>();
90     }
91     
92     private void addFlowTest()
93     {
94         try {
95                         NodeRef nodeOne = createNodeRef("foo:node:1");
96                         AddFlowInputBuilder input1 = new AddFlowInputBuilder();
97                         
98                         input1.setNode(nodeOne);
99                         AddFlowInput firstMsg = input1.build();
100                         
101                         if(null != flowService) {
102                                 System.out.println(flowService.toString());
103                         }
104                         else
105                         {
106                                 System.out.println("ConsumerFlowService is NULL");
107                         }
108                         @SuppressWarnings("unused")
109                         Future<RpcResult<java.lang.Void>> result1 = flowService.addFlow(firstMsg);
110                         
111                         
112                 } catch (Exception e) {
113                         // TODO Auto-generated catch block
114                         e.printStackTrace();
115                 }
116     }
117     /**
118      * Adds flow to the southbound plugin and our internal database
119      *
120      * @param path
121      * @param dataObject
122      */
123     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
124
125         AddFlowInputBuilder input = new AddFlowInputBuilder();
126         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
127         input.setNode((dataObject).getNode());
128         input.setPriority((dataObject).getPriority());
129         input.setMatch((dataObject).getMatch());
130         input.setCookie((dataObject).getCookie());
131         input.setInstructions((dataObject).getInstructions());
132         dataObject.getMatch().getLayer3Match()
133         for (int i=0;i<inst.size();i++) {
134             System.out.println("i = "+ i + inst.get(i).getInstruction().toString());
135             System.out.println("i = "+ i + inst.get(i).toString());
136         }
137         
138         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
139
140         // We send flow to the sounthbound plugin
141         flowService.addFlow(input.build());
142     }
143     
144     private void commitToPlugin(internalTransaction transaction) {
145         for(Entry<InstanceIdentifier<?>, Flow> entry :transaction.additions.entrySet()) {
146             System.out.println("Coming add cc in FlowDatacommitHandler");
147             addFlow(entry.getKey(),entry.getValue());
148         }
149         for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Flow> entry :transaction.updates.entrySet()) {
150             System.out.println("Coming update cc in FlowDatacommitHandler");
151            // updateFlow(entry.getKey(),entry.getValue());
152         }
153         
154         for(@SuppressWarnings("unused") InstanceIdentifier<?> removal : transaction.removals) {
155            // removeFlow(removal);
156         }
157     }
158     
159     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
160
161          @SuppressWarnings("unchecked")
162                 @Override
163          public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
164              // We should verify transaction
165                  System.out.println("Coming in FlowDatacommitHandler");
166              internalTransaction transaction = new internalTransaction(modification);
167              transaction.prepareUpdate();
168              return transaction;
169          }
170     }
171
172     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
173
174         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
175
176         @Override
177         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
178             return modification;
179         }
180
181         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
182             this.modification = modification;
183         }
184
185         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
186         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
187         Set<InstanceIdentifier<?>> removals = new HashSet<>();
188
189         /**
190          * We create a plan which flows will be added, which will be updated and
191          * which will be removed based on our internal state.
192          * 
193          */
194         void prepareUpdate() {
195
196             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
197             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
198                 if (entry.getValue() instanceof Flow) {
199                     Flow flow = (Flow) entry.getValue();
200                     preparePutEntry(entry.getKey(), flow);
201                 }
202
203             }
204
205             removals = modification.getRemovedConfigurationData();
206         }
207
208         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
209             Flow original = originalSwView.get(key);
210             if (original != null) {
211                 // It is update for us
212                 System.out.println("Coming update  in FlowDatacommitHandler");
213                 updates.put(key, flow);
214             } else {
215                 // It is addition for us
216                 System.out.println("Coming add in FlowDatacommitHandler");
217                 additions.put(key, flow);
218             }
219         }
220
221         /**
222          * We are OK to go with execution of plan
223          * 
224          */
225         @Override
226         public RpcResult<Void> finish() throws IllegalStateException {
227             
228             commitToPlugin(this);
229             // We return true if internal transaction is successful.
230           //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
231                 return Rpcs.getRpcResult(true, null, null);
232         }
233
234         /**
235          * 
236          * We should rollback our preparation
237          * 
238          */
239         @Override
240         public RpcResult<Void> rollback() throws IllegalStateException {
241             // NOOP - we did not modified any internal state during
242             // requestCommit phase
243            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
244             return Rpcs.getRpcResult(true, null, null);
245                 
246         }
247
248     }
249     
250         final class FlowEventListener implements SalFlowListener {
251         
252         List<FlowAdded> addedFlows = new ArrayList<>();
253         List<FlowRemoved> removedFlows = new ArrayList<>();
254         List<FlowUpdated> updatedFlows = new ArrayList<>();
255         
256         @Override
257         public void onFlowAdded(FlowAdded notification) {
258                 System.out.println("added flow..........................");
259         addedFlows.add(notification);
260             }
261         
262             @Override
263             public void onFlowRemoved(FlowRemoved notification) {
264                 removedFlows.add(notification);
265             };
266         
267             @Override
268             public void onFlowUpdated(FlowUpdated notification) {
269                 updatedFlows.add(notification);
270             }
271         
272         }
273         
274         final class FlowDataListener implements DataChangeListener {
275                 private SalFlowService flowService;
276                 
277                 public FlowDataListener() {
278                         
279                 }
280                 
281                 @Override
282                 public void onDataChanged(
283                                 DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {                    
284                         System.out.println("Coming in onDataChange..............");
285                         @SuppressWarnings("unchecked")
286                         Collection<DataObject> additions = (Collection<DataObject>) change.getCreatedConfigurationData();
287                         // we can check for getCreated, getDeleted or getUpdated from DataChange Event class
288                         for (DataObject dataObject : additions) {
289                             if (dataObject instanceof NodeFlow) {
290                                 NodeRef nodeOne = createNodeRef("foo:node:1");
291                                         // validating the dataObject here                               
292                                     AddFlowInputBuilder input = new AddFlowInputBuilder();
293                                     input.setNode(((NodeFlow) dataObject).getNode());
294                                     input.setNode(nodeOne);
295                                           //  input.setPriority(((NodeFlow) dataObject).getPriority());
296                                         //input.setMatch(((NodeFlow) dataObject).getMatch());
297                                         //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
298                                         //input.setCookie(((NodeFlow) dataObject).getCookie());
299                                         //input.setAction(((NodeFlow) dataObject).getAction());
300         
301                                 @SuppressWarnings("unused")
302                                         Future<RpcResult<java.lang.Void>> result = flowService.addFlow(input.build());
303                     }
304                         }       
305                 } 
306         }
307                                 
308             
309             
310     private static NodeRef createNodeRef(String string) {
311         NodeKey key = new NodeKey(new NodeId(string));
312         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
313                 .toInstance();
314
315         return new NodeRef(path);
316     }
317             
318         
319
320 }