6a1778c40db6b8068ce01bdc98cd091e12525c56
[l2switch.git] / arphandler / src / main / java / org / opendaylight / l2switch / arphandler / core / ProactiveFloodFlowWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.l2switch.arphandler.core;
9
10 import com.google.common.base.Optional;
11 import com.google.common.collect.ImmutableList;
12 import java.math.BigInteger;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.List;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.openflowplugin.api.OFConstants;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCaseBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.output.action._case.OutputActionBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.ActionBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.ActionKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowModFlags;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.OutputPortValues;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.InstructionsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.ApplyActionsCaseBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.apply.actions._case.ApplyActions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.apply.actions._case.ApplyActionsBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
71 import org.opendaylight.yangtools.concepts.ListenerRegistration;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 /**
78  * ProactiveFloodFlowWriter is used for the proactive mode of L2Switch. In this
79  * mode, flood flows are automatically written to each switch and less traffic
80  * is sent to the controller.
81  */
82 public class ProactiveFloodFlowWriter implements DataTreeChangeListener<StpStatusAwareNodeConnector>,
83         OpendaylightInventoryListener {
84
85     private static final Logger LOG = LoggerFactory.getLogger(ProactiveFloodFlowWriter.class);
86
87     private static final String FLOW_ID_PREFIX = "L2switch-";
88
89     private final DataBroker dataBroker;
90     private final SalFlowService salFlowService;
91     private final ScheduledExecutorService stpStatusDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
92     private volatile boolean flowRefreshScheduled = false;
93     private volatile boolean threadReschedule = false;
94     private long flowInstallationDelay;
95     private short flowTableId;
96     private int flowPriority;
97     private int flowIdleTimeout;
98     private int flowHardTimeout;
99     private final AtomicLong flowIdInc = new AtomicLong();
100     private final AtomicLong flowCookieInc = new AtomicLong(0x2b00000000000000L);
101
102     public ProactiveFloodFlowWriter(DataBroker dataBroker, SalFlowService salFlowService) {
103         this.dataBroker = dataBroker;
104         this.salFlowService = salFlowService;
105     }
106
107     public void setFlowInstallationDelay(long flowInstallationDelay) {
108         this.flowInstallationDelay = flowInstallationDelay;
109     }
110
111     public void setFlowTableId(short flowTableId) {
112         this.flowTableId = flowTableId;
113     }
114
115     public void setFlowPriority(int flowPriority) {
116         this.flowPriority = flowPriority;
117     }
118
119     public void setFlowIdleTimeout(int flowIdleTimeout) {
120         this.flowIdleTimeout = flowIdleTimeout;
121     }
122
123     public void setFlowHardTimeout(int flowHardTimeout) {
124         this.flowHardTimeout = flowHardTimeout;
125     }
126
127     @Override
128     public void onNodeConnectorRemoved(NodeConnectorRemoved notification) {
129         // do nothing
130     }
131
132     @Override
133     public void onNodeConnectorUpdated(NodeConnectorUpdated notification) {
134         // do nothing
135     }
136
137     @Override
138     public void onNodeRemoved(NodeRemoved notification) {
139         // do nothing
140     }
141
142     /**
143      * Install flood flows when a node comes up/down.
144      *
145      * @param notification
146      *            Notification for when a node comes up.
147      */
148     @Override
149     public void onNodeUpdated(NodeUpdated notification) {
150         if (!flowRefreshScheduled) {
151             synchronized (this) {
152                 if (!flowRefreshScheduled) {
153                     stpStatusDataChangeEventProcessor.schedule(new StpStatusDataChangeEventProcessor(),
154                             flowInstallationDelay, TimeUnit.MILLISECONDS);
155                     flowRefreshScheduled = true;
156                     LOG.debug("Scheduled Flows for refresh.");
157                 }
158             }
159         } else {
160             LOG.debug("Already scheduled for flow refresh.");
161             threadReschedule = true;
162         }
163     }
164
165     /**
166      * Registers as a data listener for Nodes.
167      */
168     public ListenerRegistration<ProactiveFloodFlowWriter> registerAsDataChangeListener() {
169         InstanceIdentifier<StpStatusAwareNodeConnector> path = InstanceIdentifier.<Nodes>builder(Nodes.class)
170                 .<Node>child(Node.class).<NodeConnector>child(NodeConnector.class)
171                 .<StpStatusAwareNodeConnector>augmentation(StpStatusAwareNodeConnector.class).build();
172         return dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
173                 LogicalDatastoreType.OPERATIONAL, path), this);
174     }
175
176     /**
177      * Install flows when a link comes up/down.
178      */
179     @Override
180     public void onDataTreeChanged(Collection<DataTreeModification<StpStatusAwareNodeConnector>> changes) {
181         if (!flowRefreshScheduled) {
182             synchronized (this) {
183                 if (!flowRefreshScheduled) {
184                     stpStatusDataChangeEventProcessor.schedule(new StpStatusDataChangeEventProcessor(),
185                             flowInstallationDelay, TimeUnit.MILLISECONDS);
186                     flowRefreshScheduled = true;
187                     LOG.debug("Scheduled Flows for refresh.");
188                 }
189             }
190         } else {
191             LOG.debug("Already scheduled for flow refresh.");
192             threadReschedule = true;
193         }
194     }
195
196     private class StpStatusDataChangeEventProcessor implements Runnable {
197         @Override
198         public void run() {
199             LOG.debug("In flow refresh thread.");
200             if (threadReschedule) {
201                 LOG.debug("Rescheduling thread");
202                 stpStatusDataChangeEventProcessor.schedule(this, flowInstallationDelay, TimeUnit.MILLISECONDS);
203                 threadReschedule = false;
204                 return;
205             }
206
207             flowRefreshScheduled = false;
208             installFloodFlows();
209         }
210
211         /**
212          * Installs a FloodFlow on each node.
213          */
214         private void installFloodFlows() {
215             Nodes nodes = null;
216             try {
217                 InstanceIdentifier.InstanceIdentifierBuilder<Nodes> nodesInsIdBuilder = InstanceIdentifier
218                         .<Nodes>builder(Nodes.class);
219                 ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
220                 Optional<Nodes> dataObjectOptional = null;
221                 dataObjectOptional = readOnlyTransaction
222                         .read(LogicalDatastoreType.OPERATIONAL, nodesInsIdBuilder.build()).get();
223                 if (dataObjectOptional.isPresent()) {
224                     nodes = dataObjectOptional.get();
225                 }
226                 readOnlyTransaction.close();
227             } catch (InterruptedException e) {
228                 LOG.error("Failed to read nodes from Operation data store.");
229                 throw new RuntimeException("Failed to read nodes from Operation data store.", e);
230             } catch (ExecutionException e) {
231                 LOG.error("Failed to read nodes from Operation data store.");
232                 throw new RuntimeException("Failed to read nodes from Operation data store.", e);
233             }
234
235             if (nodes == null) {
236                 // Reschedule thread when the data store read had errors
237                 LOG.debug("Rescheduling flow refresh thread because datastore read failed.");
238                 if (!flowRefreshScheduled) {
239                     flowRefreshScheduled = true;
240                     stpStatusDataChangeEventProcessor.schedule(this, flowInstallationDelay, TimeUnit.MILLISECONDS);
241                 }
242             } else {
243                 for (Node node : nodes.getNode()) {
244                     // Install a FloodFlow on each node
245                     List<NodeConnector> nodeConnectors = node.getNodeConnector();
246                     if (nodeConnectors != null) {
247                         for (NodeConnector outerNodeConnector : nodeConnectors) {
248                             StpStatusAwareNodeConnector outerSaNodeConnector = outerNodeConnector
249                                     .getAugmentation(StpStatusAwareNodeConnector.class);
250                             if (outerSaNodeConnector != null
251                                     && StpStatus.Discarding.equals(outerSaNodeConnector.getStatus())) {
252                                 continue;
253                             }
254                             if (!outerNodeConnector.getId().toString().contains("LOCAL")) {
255                                 ArrayList<Action> outputActions = new ArrayList<>();
256                                 for (NodeConnector nodeConnector : nodeConnectors) {
257                                     if (!nodeConnector.getId().toString().contains("LOCAL")
258                                             && !outerNodeConnector.equals(nodeConnector)) {
259                                         // NodeConnectors without STP status
260                                         // (external ports) and NodeConnectors
261                                         // that are "forwarding" will be flooded
262                                         // on
263                                         StpStatusAwareNodeConnector saNodeConnector = nodeConnector
264                                                 .getAugmentation(StpStatusAwareNodeConnector.class);
265                                         if (saNodeConnector == null
266                                                 || StpStatus.Forwarding.equals(saNodeConnector.getStatus())) {
267                                             outputActions.add(new ActionBuilder() //
268                                                     .setOrder(
269                                                             0)
270                                                     .setAction(
271                                                             new OutputActionCaseBuilder() //
272                                                                     .setOutputAction(new OutputActionBuilder() //
273                                                                             .setMaxLength(0xffff) //
274                                                                             .setOutputNodeConnector(
275                                                                                     nodeConnector.getId()) //
276                                                                             .build()) //
277                                                                     .build()) //
278                                                     .build());
279                                         }
280                                     }
281                                 }
282
283                                 // Add controller port to outputActions for
284                                 // external ports only
285                                 if (outerSaNodeConnector == null) {
286                                     outputActions.add(new ActionBuilder().setOrder(0).setKey(new ActionKey(0))
287                                             .setAction(
288                                                     new OutputActionCaseBuilder()
289                                                             .setOutputAction(new OutputActionBuilder()
290                                                                     .setMaxLength(0xffff)
291                                                                     .setOutputNodeConnector(new Uri(
292                                                                             OutputPortValues.CONTROLLER.toString()))
293                                                                     .build())
294                                                             .build())
295                                             .build());
296                                 }
297
298                                 // Create an Apply Action
299                                 ApplyActions applyActions = new ApplyActionsBuilder() //
300                                         .setAction(ImmutableList.copyOf(outputActions)).build();
301
302                                 // Wrap our Apply Action in an Instruction
303                                 Instruction applyActionsInstruction = new InstructionBuilder() //
304                                         .setOrder(0)
305                                         .setInstruction(new ApplyActionsCaseBuilder()//
306                                                 .setApplyActions(applyActions) //
307                                                 .build()) //
308                                         .build();
309
310                                 FlowBuilder floodFlowBuilder = createBaseFlowForPortMatch(outerNodeConnector);
311                                 floodFlowBuilder.setInstructions(new InstructionsBuilder() //
312                                         .setInstruction(ImmutableList.of(applyActionsInstruction)) //
313                                         .build()); //
314
315                                 writeFlowToSwitch(node.getId(), floodFlowBuilder.build());
316                             }
317                         }
318                     }
319                 }
320             }
321         }
322
323         private FlowBuilder createBaseFlowForPortMatch(NodeConnector nc) {
324             FlowBuilder floodFlow = new FlowBuilder().setTableId(flowTableId).setFlowName("flood");
325             floodFlow.setId(new FlowId(Long.toString(floodFlow.hashCode())));
326
327             Match match = new MatchBuilder().setInPort(nc.getId()).build();
328
329             floodFlow.setMatch(match) //
330                     .setPriority(flowPriority) //
331                     .setBufferId(OFConstants.OFP_NO_BUFFER) //
332                     .setHardTimeout(flowHardTimeout) //
333                     .setIdleTimeout(flowIdleTimeout) //
334                     .setCookie(new FlowCookie(BigInteger.valueOf(flowCookieInc.getAndIncrement())))
335                     .setFlags(new FlowModFlags(false, false, false, false, false));
336             return floodFlow;
337         }
338
339         /**
340          * Starts and commits data change transaction which modifies provided
341          * flow path with supplied body.
342          */
343         private Future<RpcResult<AddFlowOutput>> writeFlowToSwitch(NodeId nodeId, Flow flow) {
344             InstanceIdentifier<Node> nodeInstanceId = InstanceIdentifier.<Nodes>builder(Nodes.class)
345                     .<Node, NodeKey>child(Node.class, new NodeKey(nodeId)).build();
346             InstanceIdentifier<Table> tableInstanceId = nodeInstanceId
347                     .<FlowCapableNode>augmentation(FlowCapableNode.class)
348                     .<Table, TableKey>child(Table.class, new TableKey(flowTableId));
349             InstanceIdentifier<Flow> flowPath = tableInstanceId.<Flow, FlowKey>child(Flow.class,
350                     new FlowKey(new FlowId(FLOW_ID_PREFIX + String.valueOf(flowIdInc.getAndIncrement()))));
351
352             final AddFlowInputBuilder builder = new AddFlowInputBuilder(flow).setNode(new NodeRef(nodeInstanceId))
353                     .setFlowTable(new FlowTableRef(tableInstanceId)).setFlowRef(new FlowRef(flowPath))
354                     .setTransactionUri(new Uri(flow.getId().getValue()));
355             return salFlowService.addFlow(builder.build());
356         }
357     }
358 }