policyManager = new PolicyManager(dataProvider,
switchManager,
endpointManager,
- rpcRegistry,
executor,
tableOffset);
ofOverlayAug = new OfOverlayAug(dataProvider, epRendererAugmentationRegistry);
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
}
}
- public void commitToDataStore(DataBroker dataBroker) {
+ /**
+ * Update groups and flows on every node
+ * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows
+ * are deleted
+ * Newly created flows are returned and will be used as actual in next update
+ *
+ * @param actualFlowMap map of flows which are currently present on all nodes
+ * @return map of newly created flows. These flows will be "actual" in next update
+ */
+ public Map<InstanceIdentifier<Table>, TableBuilder> commitToDataStore(DataBroker dataBroker,
+ Map<InstanceIdentifier<Table>, TableBuilder> actualFlowMap) {
+ Map<InstanceIdentifier<Table>, TableBuilder> actualFlows = new HashMap<>();
if (dataBroker != null) {
for (NodeId nodeId : groupIdsByNode.keySet()) {
}
}
- for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
+ for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> newEntry : flowMap.entrySet()) {
try {
- /*
- * Get the currently configured flows for
- * this table.
- */
- updateFlowTable(dataBroker, entry);
+ // Get actual flows on the same node/table
+ Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualEntry = null;
+ for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> a : actualFlowMap.entrySet()) {
+ if (a.getKey().equals(newEntry.getKey())) {
+ actualEntry = a;
+ }
+ }
+ // Get the currently configured flows for this table
+ updateFlowTable(dataBroker, newEntry, actualEntry);
+ actualFlows.put(newEntry.getKey(), newEntry.getValue());
} catch (Exception e) {
- LOG.warn("Couldn't read flow table {}", entry.getKey());
+ LOG.warn("Couldn't read flow table {}", newEntry.getKey());
}
}
}
+ return actualFlows;
}
- private void updateFlowTable(DataBroker dataBroker,
- Map.Entry<InstanceIdentifier<Table>, TableBuilder> entry)
+ private void updateFlowTable(DataBroker dataBroker, Map.Entry<InstanceIdentifier<Table>, TableBuilder> desiredFlowMap,
+ Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualFlowMap)
throws ExecutionException, InterruptedException {
- // flows to update
- Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
- // flows currently in the table
- Set<Flow> curr = new HashSet<>();
-
- final InstanceIdentifier<Table> tableIid = entry.getKey();
- ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
- Optional<Table> r = t.read(LogicalDatastoreType.CONFIGURATION, tableIid).get();
- if (r.isPresent()) {
- Table currentTable = r.get();
- curr = new HashSet<>(currentTable.getFlow());
+ // Actual state
+ List<Flow> actualFlows = new ArrayList<>();
+ if (actualFlowMap != null && actualFlowMap.getValue() != null) {
+ actualFlows = actualFlowMap.getValue().getFlow();
}
+ // New state
+ List<Flow> desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow());
// Sets with custom equivalence rules
- Set<Equivalence.Wrapper<Flow>> oldFlows = new HashSet<>(
- Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
- Set<Equivalence.Wrapper<Flow>> updatedFlows = new HashSet<>(
- Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
- // what is still there but was not updated, needs to be deleted
- Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(oldFlows, updatedFlows);
- // new flows (they were not there before)
- Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(updatedFlows, oldFlows);
+ Set<Equivalence.Wrapper<Flow>> wrappedActualFlows = new HashSet<>(
+ Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+ Set<Equivalence.Wrapper<Flow>> wrappedDesiredFlows = new HashSet<>(
+ Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+
+ // All gbp flows which are not updated will be removed
+ Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows);
+ // New flows (they were not there before)
+ Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows);
+
+ final InstanceIdentifier<Table> tableIid = desiredFlowMap.getKey();
+ ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
if (!deletions.isEmpty()) {
for (Equivalence.Wrapper<Flow> wf : deletions) {
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.groupbasedpolicy.dto.EgKey;
import org.opendaylight.groupbasedpolicy.dto.EpKey;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
import org.opendaylight.groupbasedpolicy.util.IidFactory;
import org.opendaylight.groupbasedpolicy.util.SingletonTask;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
private static final Logger LOG =
LoggerFactory.getLogger(PolicyManager.class);
+ private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
+ private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows = new HashMap<>();
+
private short tableOffset;
private static final short TABLEID_PORTSECURITY = 0;
private static final short TABLEID_INGRESS_NAT = 1;
public PolicyManager(DataBroker dataBroker,
SwitchManager switchManager,
EndpointManager endpointManager,
- RpcProviderRegistry rpcRegistry,
ScheduledExecutorService executor,
short tableOffset) {
super();
for (Short tableId : tableIDs) {
for (NodeId nodeId : switchManager.getReadySwitches()) {
final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
- checkList.add(deteleTableIfExists(rwTx, tablePath));
+ checkList.add(deleteTableIfExists(rwTx, tablePath));
}
}
ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
return tableIds;
}
- private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
+ private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
@Override
private void scheduleUpdate() {
if (switchManager != null) {
LOG.trace("Scheduling flow update task");
+
+ // Mark all existing flows as previous - will be compared with new ones
+ previousGbpFlows = actualGbpFlows;
+ actualGbpFlows = new HashMap<>();
+
flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
}
CompletionService<Void> ecs
= new ExecutorCompletionService<>(executor);
- int n = 0;
OfWriter ofWriter = new OfWriter();
SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
ecs.submit(swut);
- n+=1;
-
- for (int i = 0; i < n; i++) {
- try {
- ecs.take().get();
- ofWriter.commitToDataStore(dataBroker);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to update flow tables", e);
- }
+
+ try {
+ ecs.take().get();
+ // Current gbp flow must be independent, find out where this run() ends,
+ // set flows to one field and reset another
+ actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to update flow tables", e);
}
LOG.debug("Flow update completed");
}
private static short offSet = 0;
public MockPolicyManager(EndpointManager endpointManager) {
- super(null, null, endpointManager, null, null, offSet);
+ super(null, null, endpointManager, null, offSet);
}
}
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.groupbasedpolicy.dto.EgKey;
import org.opendaylight.groupbasedpolicy.dto.EpKey;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2ContextId;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
public class PolicyManagerTest {
// constant values used by the tested class implementation
private static final short TABLEID_PORTSECURITY = 0;
- private static final short TABLEID_INGRESS_NAT = 1;
+ private static final short TABLEID_INGRESS_NAT = 1;
private static final short TABLEID_SOURCE_MAPPER = 2;
private static final short TABLEID_DESTINATION_MAPPER = 3;
private static final short TABLEID_POLICY_ENFORCER = 4;
private DataBroker dataBroker;
private SwitchManager switchManager;
- private EndpointManager endpointManager;
- private RpcProviderRegistry rpcRegistry;
- private ScheduledExecutorService executor;
private short tableOffset;
- private WriteTransaction writeTransaction;
private ReadWriteTransaction readWriteTransaction;
private NodeId nodeId;
@Before
public void setUp() {
+ EndpointManager endpointManager = mock(EndpointManager.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
dataBroker = mock(DataBroker.class);
switchManager = mock(SwitchManager.class);
- endpointManager = mock(EndpointManager.class);
- rpcRegistry = mock(RpcProviderRegistry.class);
- executor = mock(ScheduledExecutorService.class);
tableOffset = 5;
- writeTransaction = mock(WriteTransaction.class);
+ WriteTransaction writeTransaction = mock(WriteTransaction.class);
when(dataBroker.newWriteOnlyTransaction()).thenReturn(writeTransaction);
readWriteTransaction = mock(ReadWriteTransaction.class);
when(dataBroker.newReadWriteTransaction()).thenReturn(readWriteTransaction);
manager = new PolicyManager(dataBroker, switchManager,
- endpointManager, rpcRegistry, executor, tableOffset);
+ endpointManager, executor, tableOffset);
nodeId = mock(NodeId.class);
tableId = 5;
CheckedFuture<Void, TransactionCommitFailedException> submitFuture = mock(CheckedFuture.class);
when(readWriteTransaction.submit()).thenReturn(submitFuture);
- flowMap.commitToDataStore(dataBroker);
+ flowMap.commitToDataStore(dataBroker, new HashMap<InstanceIdentifier<Table>, TableBuilder>());
InOrder orderCheck = inOrder(readWriteTransaction);
- orderCheck.verify(readWriteTransaction).read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class));
orderCheck.verify(readWriteTransaction).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class),
any(Flow.class), any(Boolean.class));
orderCheck.verify(readWriteTransaction).submit();
@Test
public void changeOpenFlowTableOffsetTest() throws Exception {
short tableOffset = 3;
- assertTrue(manager.changeOpenFlowTableOffset(tableOffset) instanceof ListenableFuture<?>);
+ assertTrue(manager.changeOpenFlowTableOffset(tableOffset) != null);
verify(switchManager, times(7)).getReadySwitches();
}