1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
6 import java.util.HashSet;
9 import java.util.Map.Entry;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.Future;
14 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
15 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
18 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
19 import org.opendaylight.controller.sal.common.util.Rpcs;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.opendaylight.yangtools.yang.binding.NotificationListener;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 public class FlowConsumerImpl {
47 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
48 private FlowEventListener flowEventListener = new FlowEventListener();
49 private Registration<NotificationListener> listener1Reg;
50 private SalFlowService flowService;
51 private FlowDataListener listener;
52 private FlowDataCommitHandler commitHandler;
53 private ConcurrentHashMap<FlowKey, Flow> originalSwView;
55 public FlowConsumerImpl() {
56 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
57 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
59 if (null == flowService) {
60 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
61 System.out.println("Consumer SAL Service is down or NULL.");
65 listener = new FlowDataListener();
67 if (null == FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path, listener)) {
68 logger.error("Failed to listen on flow data modifcation events");
69 System.out.println("Consumer SAL Service is down or NULL.");
74 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
76 if (null == listener1Reg) {
77 logger.error("Listener to listen on flow data modifcation events");
78 System.out.println("Consumer SAL Service is down or NULL.");
82 System.out.println("-------------------------------------------------------------------");
84 commitHandler = new FlowDataCommitHandler();
85 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
88 private void allocateCaches() {
89 originalSwView = new ConcurrentHashMap<FlowKey, Flow>();
92 private void addFlowTest()
95 NodeRef nodeOne = createNodeRef("foo:node:1");
96 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
98 input1.setNode(nodeOne);
99 AddFlowInput firstMsg = input1.build();
101 if(null != flowService) {
102 System.out.println(flowService.toString());
106 System.out.println("ConsumerFlowService is NULL");
108 @SuppressWarnings("unused")
109 Future<RpcResult<java.lang.Void>> result1 = flowService.addFlow(firstMsg);
112 } catch (Exception e) {
113 // TODO Auto-generated catch block
118 * Adds flow to the southbound plugin and our internal database
123 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
125 AddFlowInputBuilder input = new AddFlowInputBuilder();
126 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
127 input.setNode((dataObject).getNode());
128 input.setPriority((dataObject).getPriority());
129 input.setMatch((dataObject).getMatch());
130 input.setCookie((dataObject).getCookie());
131 input.setInstructions((dataObject).getInstructions());
132 dataObject.getMatch().getLayer3Match()
133 for (int i=0;i<inst.size();i++) {
134 System.out.println("i = "+ i + inst.get(i).getInstruction().toString());
135 System.out.println("i = "+ i + inst.get(i).toString());
138 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
140 // We send flow to the sounthbound plugin
141 flowService.addFlow(input.build());
144 private void commitToPlugin(internalTransaction transaction) {
145 for(Entry<InstanceIdentifier<?>, Flow> entry :transaction.additions.entrySet()) {
146 System.out.println("Coming add cc in FlowDatacommitHandler");
147 addFlow(entry.getKey(),entry.getValue());
149 for(@SuppressWarnings("unused") Entry<InstanceIdentifier<?>, Flow> entry :transaction.updates.entrySet()) {
150 System.out.println("Coming update cc in FlowDatacommitHandler");
151 // updateFlow(entry.getKey(),entry.getValue());
154 for(@SuppressWarnings("unused") InstanceIdentifier<?> removal : transaction.removals) {
155 // removeFlow(removal);
159 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
161 @SuppressWarnings("unchecked")
163 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
164 // We should verify transaction
165 System.out.println("Coming in FlowDatacommitHandler");
166 internalTransaction transaction = new internalTransaction(modification);
167 transaction.prepareUpdate();
172 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
174 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
177 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
181 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
182 this.modification = modification;
185 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
186 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
187 Set<InstanceIdentifier<?>> removals = new HashSet<>();
190 * We create a plan which flows will be added, which will be updated and
191 * which will be removed based on our internal state.
194 void prepareUpdate() {
196 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
197 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
198 if (entry.getValue() instanceof Flow) {
199 Flow flow = (Flow) entry.getValue();
200 preparePutEntry(entry.getKey(), flow);
205 removals = modification.getRemovedConfigurationData();
208 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
209 Flow original = originalSwView.get(key);
210 if (original != null) {
211 // It is update for us
212 System.out.println("Coming update in FlowDatacommitHandler");
213 updates.put(key, flow);
215 // It is addition for us
216 System.out.println("Coming add in FlowDatacommitHandler");
217 additions.put(key, flow);
222 * We are OK to go with execution of plan
226 public RpcResult<Void> finish() throws IllegalStateException {
228 commitToPlugin(this);
229 // We return true if internal transaction is successful.
230 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
231 return Rpcs.getRpcResult(true, null, null);
236 * We should rollback our preparation
240 public RpcResult<Void> rollback() throws IllegalStateException {
241 // NOOP - we did not modified any internal state during
242 // requestCommit phase
243 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
244 return Rpcs.getRpcResult(true, null, null);
250 final class FlowEventListener implements SalFlowListener {
252 List<FlowAdded> addedFlows = new ArrayList<>();
253 List<FlowRemoved> removedFlows = new ArrayList<>();
254 List<FlowUpdated> updatedFlows = new ArrayList<>();
257 public void onFlowAdded(FlowAdded notification) {
258 System.out.println("added flow..........................");
259 addedFlows.add(notification);
263 public void onFlowRemoved(FlowRemoved notification) {
264 removedFlows.add(notification);
268 public void onFlowUpdated(FlowUpdated notification) {
269 updatedFlows.add(notification);
274 final class FlowDataListener implements DataChangeListener {
275 private SalFlowService flowService;
277 public FlowDataListener() {
282 public void onDataChanged(
283 DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
284 System.out.println("Coming in onDataChange..............");
285 @SuppressWarnings("unchecked")
286 Collection<DataObject> additions = (Collection<DataObject>) change.getCreatedConfigurationData();
287 // we can check for getCreated, getDeleted or getUpdated from DataChange Event class
288 for (DataObject dataObject : additions) {
289 if (dataObject instanceof NodeFlow) {
290 NodeRef nodeOne = createNodeRef("foo:node:1");
291 // validating the dataObject here
292 AddFlowInputBuilder input = new AddFlowInputBuilder();
293 input.setNode(((NodeFlow) dataObject).getNode());
294 input.setNode(nodeOne);
295 // input.setPriority(((NodeFlow) dataObject).getPriority());
296 //input.setMatch(((NodeFlow) dataObject).getMatch());
297 //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
298 //input.setCookie(((NodeFlow) dataObject).getCookie());
299 //input.setAction(((NodeFlow) dataObject).getAction());
301 @SuppressWarnings("unused")
302 Future<RpcResult<java.lang.Void>> result = flowService.addFlow(input.build());
310 private static NodeRef createNodeRef(String string) {
311 NodeKey key = new NodeKey(new NodeId(string));
312 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
315 return new NodeRef(path);