2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorCompletionService;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
22 import com.google.common.base.Function;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.util.concurrent.AsyncFunction;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
48 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
49 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
52 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
53 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
54 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * Manage policies on switches by subscribing to updates from the
67 * policy resolver and information about endpoints from the endpoint
70 public class PolicyManager
71 implements SwitchListener, PolicyListener, EndpointListener {
72 private static final Logger LOG =
73 LoggerFactory.getLogger(PolicyManager.class);
75 private short tableOffset;
76 private static final short TABLEID_PORTSECURITY = 0;
77 private static final short TABLEID_INGRESS_NAT = 1;
78 private static final short TABLEID_SOURCE_MAPPER = 2;
79 private static final short TABLEID_DESTINATION_MAPPER = 3;
80 private static final short TABLEID_POLICY_ENFORCER = 4;
81 private static final short TABLEID_EGRESS_NAT = 5;
82 private static final short TABLEID_EXTERNAL_MAPPER = 6;
84 private final SwitchManager switchManager;
85 private final PolicyResolver policyResolver;
87 private final PolicyScope policyScope;
89 private final ScheduledExecutorService executor;
90 private final SingletonTask flowUpdateTask;
91 private final DataBroker dataBroker;
92 private final OfContext ofCtx;
94 * The flow tables that make up the processing pipeline
96 private List<? extends OfTable> flowPipeline;
99 * The delay before triggering the flow update task in response to an
100 * event in milliseconds.
102 private final static int FLOW_UPDATE_DELAY = 250;
104 public PolicyManager(DataBroker dataBroker,
105 PolicyResolver policyResolver,
106 SwitchManager switchManager,
107 EndpointManager endpointManager,
108 RpcProviderRegistry rpcRegistry,
109 ScheduledExecutorService executor,
112 this.switchManager = switchManager;
113 this.executor = executor;
114 this.policyResolver = policyResolver;
115 this.dataBroker = dataBroker;
116 this.tableOffset = tableOffset;
118 // to validate against model
119 verifyMaxTableId(tableOffset);
120 } catch (IllegalArgumentException e) {
121 throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
122 + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
125 if (dataBroker != null) {
126 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
127 t.put(LogicalDatastoreType.OPERATIONAL,
129 .builder(SubjectFeatureDefinitions.class)
131 SubjectFeatures.OF_OVERLAY_FEATURES);
135 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
136 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
139 ofCtx = new OfContext(dataBroker, rpcRegistry,
140 this, policyResolver, switchManager,
141 endpointManager, executor);
143 flowPipeline = createFlowPipeline();
145 policyScope = policyResolver.registerListener(this);
146 if (switchManager != null)
147 switchManager.registerListener(this);
148 endpointManager.registerListener(this);
150 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
153 LOG.debug("Initialized OFOverlay policy manager");
156 private List<? extends OfTable> createFlowPipeline() {
157 // TODO - PORTSECURITY is kept in table 0.
158 // According to openflow spec,processing on vSwitch always starts from table 0.
159 // Packets will be droped if table 0 is empty.
160 // Alternative workaround - table-miss flow entries in table 0.
161 return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
162 new GroupTable(ofCtx),
163 new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
164 new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
165 new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
166 new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
167 new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
168 new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
173 * @param tableOffset the new offset value
174 * @return {@link ListenableFuture} to indicate that tables have been synced
176 public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
178 verifyMaxTableId(tableOffset);
179 } catch (IllegalArgumentException e) {
180 LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
181 // TODO - invalid offset value remains in conf DS
182 // It's not possible to validate offset value by using constrains in model,
183 // because number of tables in pipeline varies.
184 return Futures.immediateFuture(null);
186 List<Short> tableIDs = getTableIDs();
187 this.tableOffset = tableOffset;
188 return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
191 public Void apply(Void tablesRemoved) {
192 flowPipeline = createFlowPipeline();
200 * @param tableIDs - IDs of tables to delete
201 * @return ListenableFuture<Void> - which will be filled when clearing is done
203 private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
204 List<ListenableFuture<Void>> checkList = new ArrayList<>();
205 final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
206 for (Short tableId : tableIDs) {
207 for (NodeId nodeId : switchManager.getReadySwitches()) {
208 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
209 checkList.add(deteleTableIfExists(rwTx, tablePath));
212 ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
213 return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
216 public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
217 return rwTx.submit();
222 private List<Short> getTableIDs() {
223 List<Short> tableIds = new ArrayList<>();
224 tableIds.add(getTABLEID_PORTSECURITY());
225 tableIds.add(getTABLEID_INGRESS_NAT());
226 tableIds.add(getTABLEID_SOURCE_MAPPER());
227 tableIds.add(getTABLEID_DESTINATION_MAPPER());
228 tableIds.add(getTABLEID_POLICY_ENFORCER());
229 tableIds.add(getTABLEID_EGRESS_NAT());
230 tableIds.add(getTABLEID_EXTERNAL_MAPPER());
234 private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
235 return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
238 public Void apply(Optional<Table> optTable) {
239 if(optTable.isPresent()){
240 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
250 public short getTABLEID_PORTSECURITY() {
251 return (short)(tableOffset+TABLEID_PORTSECURITY);
255 public short getTABLEID_INGRESS_NAT() {
256 return (short)(tableOffset+TABLEID_INGRESS_NAT);
260 public short getTABLEID_SOURCE_MAPPER() {
261 return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
265 public short getTABLEID_DESTINATION_MAPPER() {
266 return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
270 public short getTABLEID_POLICY_ENFORCER() {
271 return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
275 public short getTABLEID_EGRESS_NAT() {
276 return (short)(tableOffset+TABLEID_EGRESS_NAT);
280 public short getTABLEID_EXTERNAL_MAPPER() {
281 return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
285 public TableId verifyMaxTableId(short tableOffset) {
286 return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
290 public void switchReady(final NodeId nodeId) {
295 public void switchRemoved(NodeId sw) {
296 // XXX TODO purge switch flows
301 public void switchUpdated(NodeId sw) {
310 public void endpointUpdated(EpKey epKey) {
315 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
320 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
321 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
330 public void policyUpdated(Set<EgKey> updatedConsumers) {
339 * Set the learning mode to the specified value
340 * @param learningMode the learning mode to set
342 public void setLearningMode(LearningMode learningMode) {
350 private void scheduleUpdate() {
351 if (switchManager != null) {
352 LOG.trace("Scheduling flow update task");
353 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
358 * Update the flows on a particular switch
360 private class SwitchFlowUpdateTask implements Callable<Void> {
361 private OfWriter ofWriter;
363 public SwitchFlowUpdateTask(OfWriter ofWriter) {
365 this.ofWriter = ofWriter;
369 public Void call() throws Exception {
370 for (NodeId node : switchManager.getReadySwitches()) {
371 PolicyInfo info = policyResolver.getCurrentPolicy();
374 for (OfTable table : flowPipeline) {
376 table.update(node, info, ofWriter);
377 } catch (Exception e) {
378 LOG.error("Failed to write Openflow table {}",
379 table.getClass().getSimpleName(), e);
388 * Update all flows on all switches as needed. Note that this will block
389 * one of the threads on the executor.
391 private class FlowUpdateTask implements Runnable {
394 LOG.debug("Beginning flow update task");
396 CompletionService<Void> ecs
397 = new ExecutorCompletionService<>(executor);
400 OfWriter ofWriter = new OfWriter();
402 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
406 for (int i = 0; i < n; i++) {
409 ofWriter.commitToDataStore(dataBroker);
410 } catch (InterruptedException | ExecutionException e) {
411 LOG.error("Failed to update flow tables", e);
414 LOG.debug("Flow update completed");