Initial implementation of the ClusteredDataStore
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Map.Entry;
10 import java.util.Set;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
15 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
18 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
19 import org.opendaylight.controller.sal.common.util.Rpcs;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
36 import org.opendaylight.yangtools.concepts.Registration;
37 import org.opendaylight.yangtools.yang.binding.DataObject;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.binding.NotificationListener;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 public class FlowConsumerImpl {
45         protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
46         private FlowEventListener flowEventListener = new FlowEventListener();
47     private Registration<NotificationListener> listener1Reg;
48         private SalFlowService flowService;
49         private FlowDataListener listener;
50         private FlowDataCommitHandler commitHandler;
51         private ConcurrentHashMap<FlowKey, Flow> originalSwView;
52         
53     public FlowConsumerImpl() {         
54                 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
55         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
56                 
57                 if (null == flowService) {
58                         logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
59                 System.out.println("Consumer SAL Service is down or NULL.");
60                 return;
61                 }
62                 
63                 listener = new FlowDataListener();
64                 if (null == FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path, listener)) {
65                         logger.error("Failed to listen on flow data modifcation events");
66                 System.out.println("Consumer SAL Service is down or NULL.");
67                 return;
68                 }       
69                         
70                 // For switch events
71                 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
72                 
73                 if (null == listener1Reg) {
74                         logger.error("Listener to listen on flow data modifcation events");
75                 System.out.println("Consumer SAL Service is down or NULL.");
76                 return;
77                 }
78                 addFlowTest();
79                 System.out.println("-------------------------------------------------------------------");
80                 allocateCaches();
81                 commitHandler = new FlowDataCommitHandler();
82                 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
83     }
84     
85     private void allocateCaches() {
86         originalSwView = new ConcurrentHashMap<FlowKey, Flow>();
87     }
88     
89     private void addFlowTest()
90     {
91         try {
92                         NodeRef nodeOne = createNodeRef("foo:node:1");
93                         AddFlowInputBuilder input1 = new AddFlowInputBuilder();
94                         
95                         input1.setNode(nodeOne);
96                         AddFlowInput firstMsg = input1.build();
97                         
98                         if(null != flowService) {
99                                 System.out.println(flowService.toString());
100                         }
101                         else
102                         {
103                                 System.out.println("ConsumerFlowService is NULL");
104                         }
105                         @SuppressWarnings("unused")
106                         Future<RpcResult<java.lang.Void>> result1 = flowService.addFlow(firstMsg);
107                         
108                         
109                 } catch (Exception e) {
110                         // TODO Auto-generated catch block
111                         e.printStackTrace();
112                 }
113     }
114     /**
115      * Adds flow to the southbound plugin and our internal database
116      *
117      * @param path
118      * @param dataObject
119      */
120     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
121
122         AddFlowInputBuilder input = new AddFlowInputBuilder();
123         input.setNode((dataObject).getNode());
124         input.setPriority((dataObject).getPriority());
125         input.setMatch((dataObject).getMatch());
126         input.setCookie((dataObject).getCookie());
127         input.setAction((dataObject).getAction());
128
129         // We send flow to the sounthbound plugin
130         flowService.addFlow(input.build());
131     }
132     
133     private void commitToPlugin(internalTransaction transaction) {
134         for(Entry<InstanceIdentifier<?>, Flow> entry :transaction.additions.entrySet()) {
135             addFlow(entry.getKey(),entry.getValue());
136         }
137         for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Flow> entry :transaction.additions.entrySet()) {
138            // updateFlow(entry.getKey(),entry.getValue());
139         }
140         
141         for(@SuppressWarnings("unused") InstanceIdentifier<?> removal : transaction.removals) {
142            // removeFlow(removal);
143         }
144     }
145     
146     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
147
148          @SuppressWarnings("unchecked")
149                 @Override
150          public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
151              // We should verify transaction
152                  System.out.println("Coming in FlowDatacommitHandler");
153              internalTransaction transaction = new internalTransaction(modification);
154              transaction.prepareUpdate();
155              return transaction;
156          }
157     }
158
159     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
160
161         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
162
163         @Override
164         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
165             return modification;
166         }
167
168         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
169             this.modification = modification;
170         }
171
172         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
173         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
174         Set<InstanceIdentifier<?>> removals = new HashSet<>();
175
176         /**
177          * We create a plan which flows will be added, which will be updated and
178          * which will be removed based on our internal state.
179          * 
180          */
181         void prepareUpdate() {
182
183             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
184             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
185                 if (entry.getValue() instanceof Flow) {
186                     Flow flow = (Flow) entry.getValue();
187                     preparePutEntry(entry.getKey(), flow);
188                 }
189
190             }
191
192             removals = modification.getRemovedConfigurationData();
193         }
194
195         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
196             Flow original = originalSwView.get(key);
197             if (original != null) {
198                 // It is update for us
199                 updates.put(key, flow);
200             } else {
201                 // It is addition for us
202                 additions.put(key, flow);
203             }
204         }
205
206         /**
207          * We are OK to go with execution of plan
208          * 
209          */
210         @Override
211         public RpcResult<Void> finish() throws IllegalStateException {
212             
213             commitToPlugin(this);
214             // We return true if internal transaction is successful.
215           //  return Rpcs.getRpcResult(true, null, Collections.emptySet());
216                 return Rpcs.getRpcResult(true, null, null);
217         }
218
219         /**
220          * 
221          * We should rollback our preparation
222          * 
223          */
224         @Override
225         public RpcResult<Void> rollback() throws IllegalStateException {
226             // NOOP - we did not modified any internal state during
227             // requestCommit phase
228            // return Rpcs.getRpcResult(true, null, Collections.emptySet());
229             return Rpcs.getRpcResult(true, null, null);
230                 
231         }
232
233     }
234     
235         final class FlowEventListener implements SalFlowListener {
236         
237         List<FlowAdded> addedFlows = new ArrayList<>();
238         List<FlowRemoved> removedFlows = new ArrayList<>();
239         List<FlowUpdated> updatedFlows = new ArrayList<>();
240         
241         @Override
242         public void onFlowAdded(FlowAdded notification) {
243                 System.out.println("added flow..........................");
244         addedFlows.add(notification);
245             }
246         
247             @Override
248             public void onFlowRemoved(FlowRemoved notification) {
249                 removedFlows.add(notification);
250             };
251         
252             @Override
253             public void onFlowUpdated(FlowUpdated notification) {
254                 updatedFlows.add(notification);
255             }
256         
257         }
258         
259         final class FlowDataListener implements DataChangeListener {
260                 private SalFlowService flowService;
261                 
262                 public FlowDataListener() {
263                         
264                 }
265                 
266                 @Override
267                 public void onDataChanged(
268                                 DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {                    
269                         System.out.println("Coming in onDataChange..............");
270                         @SuppressWarnings("unchecked")
271                         Collection<DataObject> additions = (Collection<DataObject>) change.getCreatedConfigurationData();
272                         // we can check for getCreated, getDeleted or getUpdated from DataChange Event class
273                         for (DataObject dataObject : additions) {
274                             if (dataObject instanceof NodeFlow) {
275                                 NodeRef nodeOne = createNodeRef("foo:node:1");
276                                         // validating the dataObject here
277                                     AddFlowInputBuilder input = new AddFlowInputBuilder();
278                                     input.setNode(((NodeFlow) dataObject).getNode());
279                                     input.setNode(nodeOne);
280                                           //  input.setPriority(((NodeFlow) dataObject).getPriority());
281                                         //input.setMatch(((NodeFlow) dataObject).getMatch());
282                                         //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
283                                         //input.setCookie(((NodeFlow) dataObject).getCookie());
284                                         //input.setAction(((NodeFlow) dataObject).getAction());
285         
286                                 @SuppressWarnings("unused")
287                                         Future<RpcResult<java.lang.Void>> result = flowService.addFlow(input.build());
288                     }
289                         }       
290                 } 
291         }
292                                 
293             
294             
295     private static NodeRef createNodeRef(String string) {
296         NodeKey key = new NodeKey(new NodeId(string));
297         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
298                 .toInstance();
299
300         return new NodeRef(path);
301     }
302             
303           /*  private void loadFlowData() {
304         
305                     DataModification modification = (DataModification) dataservice.beginTransaction();
306                     String id = "abc";
307                     FlowKey key = new FlowKey(id, new NodeRef());
308                     InstanceIdentifier<?> path1;
309                     FlowBuilder flow = new FlowBuilder();
310                     flow.setKey(key);
311                     path1 = InstanceIdentifier.builder().node(Flows.class).node(Flow.class, key).toInstance();
312                     DataObject cls = (DataObject) modification.readConfigurationData(path);
313                     modification.putConfigurationData(path, flow.build());
314                     modification.commit();
315                 }*/
316
317 }