Adding back single flood flow.
[l2switch.git] / l2switch-main / src / main / java / org / opendaylight / l2switch / arphandler / ProactiveFloodFlowWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.l2switch.arphandler;
9
10 import com.google.common.base.Optional;
11 import com.google.common.collect.ImmutableList;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
14 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
15 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCaseBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.output.action._case.OutputActionBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.ActionBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.ActionKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.OutputPortValues;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.InstructionsBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.ApplyActionsCaseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.apply.actions._case.ApplyActions;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.apply.actions._case.ApplyActionsBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.binding.DataObject;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import java.math.BigInteger;
63 import java.util.ArrayList;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Executors;
69 import java.util.concurrent.Future;
70 import java.util.concurrent.ScheduledExecutorService;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.atomic.AtomicLong;
73
74 /**
75  * ProactiveFloodFlowWriter is used for the proactive mode of L2Switch.
76  * In this mode, flood flows are automatically written to each switch and less traffic is sent to the controller.
77  */
78 public class ProactiveFloodFlowWriter implements DataChangeListener {
79
80   private static final Logger _logger = LoggerFactory.getLogger(ProactiveFloodFlowWriter.class);
81   private final DataBroker dataBroker;
82   private final SalFlowService salFlowService;
83   private final ScheduledExecutorService stpStatusDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
84   private boolean flowRefreshScheduled = false;
85   private final long DEFAULT_DELAY = 2;
86   private final short FLOW_TABLE_ID = 0;//TODO:hard coded to 0 may need change if multiple tables are used.
87   private AtomicLong flowIdInc = new AtomicLong();
88   private AtomicLong flowCookieInc = new AtomicLong(0x2b00000000000000L);
89
90   public ProactiveFloodFlowWriter(DataBroker dataBroker, SalFlowService salFlowService) {
91     this.dataBroker = dataBroker;
92     this.salFlowService = salFlowService;
93   }
94
95   /**
96    * Registers as a data listener to receive changes done to
97    * {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
98    * under {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
99    * operation data root.
100    */
101   public ListenerRegistration<DataChangeListener> registerAsDataChangeListener() {
102     InstanceIdentifier<StpStatusAwareNodeConnector> path = InstanceIdentifier.<Nodes>builder(Nodes.class)
103         .<Node>child(Node.class)
104         .<NodeConnector>child(NodeConnector.class)
105         .<StpStatusAwareNodeConnector>augmentation(StpStatusAwareNodeConnector.class)
106         .toInstance();
107     return dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, path, this, AsyncDataBroker.DataChangeScope.BASE);
108   }
109
110   @Override
111   public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
112     if(dataChangeEvent == null) {
113       return;
114     }
115     Map<InstanceIdentifier<?>, DataObject> createdData = dataChangeEvent.getCreatedData();
116     Set<InstanceIdentifier<?>> removedPaths = dataChangeEvent.getRemovedPaths();
117     Map<InstanceIdentifier<?>, DataObject> originalData = dataChangeEvent.getOriginalData();
118     boolean refreshFlows = (createdData != null && !createdData.isEmpty()) ||
119         (removedPaths != null && !removedPaths.isEmpty() && originalData != null && !originalData.isEmpty());
120
121     if(!refreshFlows) {
122       return;
123     }
124     if(!flowRefreshScheduled) {
125       synchronized(this) {
126         if(!flowRefreshScheduled) {
127           stpStatusDataChangeEventProcessor.schedule(new StpStatusDataChangeEventProcessor(), DEFAULT_DELAY, TimeUnit.SECONDS);
128           flowRefreshScheduled = true;
129           _logger.debug("Scheduled Flows for refresh.");
130         }
131       }
132     } else {
133       _logger.debug("Already scheduled for flow refresh.");
134     }
135   }
136
137   private class StpStatusDataChangeEventProcessor implements Runnable {
138     AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> instanceIdentifierDataObjectAsyncDataChangeEvent;
139
140     @Override
141     public void run() {
142       _logger.debug("In flow refresh thread.");
143       flowRefreshScheduled = false;
144       installFloodFlows();
145     }
146
147     /**
148      * Installs a FloodFlow on each node
149      */
150     private void installFloodFlows() {
151       Nodes nodes = null;
152       try {
153         InstanceIdentifier.InstanceIdentifierBuilder<Nodes> nodesInsIdBuilder = InstanceIdentifier.<Nodes>builder(Nodes.class);
154         ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
155         Optional<Nodes> dataObjectOptional = null;
156         dataObjectOptional = readOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, nodesInsIdBuilder.toInstance()).get();
157         if(dataObjectOptional.isPresent()) {
158           nodes = (Nodes) dataObjectOptional.get();
159         }
160         readOnlyTransaction.close();
161       } catch(InterruptedException e) {
162         _logger.error("Failed to read nodes from Operation data store.");
163         throw new RuntimeException("Failed to read nodes from Operation data store.", e);
164       } catch(ExecutionException e) {
165         _logger.error("Failed to read nodes from Operation data store.");
166         throw new RuntimeException("Failed to read nodes from Operation data store.", e);
167       }
168
169       if(nodes != null) {
170         for(Node node : nodes.getNode()) {
171           // Install a FloodFlow on each node
172           List<NodeConnector> nodeConnectors = node.getNodeConnector();
173           if(nodeConnectors != null) {
174             ArrayList<Action> outputActions = new ArrayList<Action>();
175             for(NodeConnector nodeConnector : nodeConnectors) {
176               if(!nodeConnector.getId().toString().contains("LOCAL")) {
177                 // NodeConnectors without STP status (external ports) and NodeConnectors that are "forwarding" will be flooded on
178                 StpStatusAwareNodeConnector saNodeConnector = nodeConnector.getAugmentation(StpStatusAwareNodeConnector.class);
179                 if(saNodeConnector == null || StpStatus.Forwarding.equals(saNodeConnector.getStatus())) {
180                   outputActions.add(new ActionBuilder() //
181                       .setOrder(0)
182                       .setAction(new OutputActionCaseBuilder() //
183                           .setOutputAction(new OutputActionBuilder() //
184                               .setMaxLength(new Integer(0xffff)) //
185                               .setOutputNodeConnector(nodeConnector.getId()) //
186                               .build()) //
187                           .build()) //
188                       .build());
189                 }
190               }
191             }
192
193             // Add controller port to outputActions
194             outputActions.add(new ActionBuilder()
195                 .setOrder(0)
196                 .setKey(new ActionKey(0))
197                 .setAction(new OutputActionCaseBuilder()
198                     .setOutputAction(new OutputActionBuilder()
199                         .setMaxLength(new Integer(0xffff))
200                         .setOutputNodeConnector(new Uri(OutputPortValues.CONTROLLER.toString()))
201                         .build())
202                     .build())
203                 .build());
204
205             // Create an Apply Action
206             ApplyActions applyActions = new ApplyActionsBuilder().setAction(outputActions).build();
207
208             // Wrap our Apply Action in an Instruction
209             Instruction applyActionsInstruction = new InstructionBuilder() //
210                 .setOrder(0)
211                 .setInstruction(new ApplyActionsCaseBuilder()//
212                     .setApplyActions(applyActions) //
213                     .build()) //
214                 .build();
215
216             FlowBuilder floodFlowBuilder = createBaseFlow();
217             floodFlowBuilder.setInstructions(new InstructionsBuilder() //
218                 .setInstruction(ImmutableList.of(applyActionsInstruction)) //
219                 .build()); //
220
221             writeFlowToSwitch(node.getId(), floodFlowBuilder.build());
222           }
223         }
224       }
225     }
226
227     private FlowBuilder createBaseFlow() {
228       FlowBuilder floodFlow = new FlowBuilder()
229           .setTableId(FLOW_TABLE_ID)
230           .setFlowName("flood");
231       floodFlow.setId(new FlowId(Long.toString(floodFlow.hashCode())));
232
233       Match match = new MatchBuilder()
234           .build();
235
236       floodFlow
237           .setMatch(match) //
238           .setPriority(2) //
239           .setBufferId(0L) //
240           .setHardTimeout(0) //
241           .setIdleTimeout(0) //
242           .setCookie(new FlowCookie(BigInteger.valueOf(flowCookieInc.getAndIncrement())))
243           .setFlags(new FlowModFlags(false, false, false, false, false));
244       return floodFlow;
245     }
246
247     /**
248      * Starts and commits data change transaction which
249      * modifies provided flow path with supplied body.
250      */
251     private Future<RpcResult<AddFlowOutput>> writeFlowToSwitch(NodeId nodeId, Flow flow) {
252       InstanceIdentifier<Node> nodeInstanceId = InstanceIdentifier.<Nodes>builder(Nodes.class)
253           .<Node, NodeKey>child(Node.class, new NodeKey(nodeId)).toInstance();
254       InstanceIdentifier<Table> tableInstanceId = nodeInstanceId.<FlowCapableNode>augmentation(FlowCapableNode.class)
255           .<Table, TableKey>child(Table.class, new TableKey(FLOW_TABLE_ID));
256       InstanceIdentifier<Flow> flowPath = tableInstanceId
257           .<Flow, FlowKey>child(Flow.class, new FlowKey(new FlowId(String.valueOf(flowIdInc.getAndIncrement()))));
258
259       final AddFlowInputBuilder builder = new AddFlowInputBuilder(flow)
260           .setNode(new NodeRef(nodeInstanceId))
261           .setFlowTable(new FlowTableRef(tableInstanceId))
262           .setFlowRef(new FlowRef(flowPath))
263           .setTransactionUri(new Uri(flow.getId().getValue()));
264       return salFlowService.addFlow(builder.build());
265     }
266   }
267 }