import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.groupbasedpolicy.dto.EgKey;
import org.opendaylight.groupbasedpolicy.dto.EpKey;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
import org.opendaylight.groupbasedpolicy.util.IidFactory;
import org.opendaylight.groupbasedpolicy.util.SingletonTask;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
private static final Logger LOG =
LoggerFactory.getLogger(PolicyManager.class);
+ private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
+ private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows = new HashMap<>();
+
private short tableOffset;
private static final short TABLEID_PORTSECURITY = 0;
private static final short TABLEID_INGRESS_NAT = 1;
public PolicyManager(DataBroker dataBroker,
SwitchManager switchManager,
EndpointManager endpointManager,
- RpcProviderRegistry rpcRegistry,
ScheduledExecutorService executor,
short tableOffset) {
super();
for (Short tableId : tableIDs) {
for (NodeId nodeId : switchManager.getReadySwitches()) {
final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
- checkList.add(deteleTableIfExists(rwTx, tablePath));
+ checkList.add(deleteTableIfExists(rwTx, tablePath));
}
}
ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
return tableIds;
}
- private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
+ private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
@Override
private void scheduleUpdate() {
if (switchManager != null) {
LOG.trace("Scheduling flow update task");
+
+ // Mark all existing flows as previous - will be compared with new ones
+ previousGbpFlows = actualGbpFlows;
+ actualGbpFlows = new HashMap<>();
+
flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
}
CompletionService<Void> ecs
= new ExecutorCompletionService<>(executor);
- int n = 0;
OfWriter ofWriter = new OfWriter();
SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
ecs.submit(swut);
- n+=1;
-
- for (int i = 0; i < n; i++) {
- try {
- ecs.take().get();
- ofWriter.commitToDataStore(dataBroker);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to update flow tables", e);
- }
+
+ try {
+ ecs.take().get();
+ // Current gbp flow must be independent, find out where this run() ends,
+ // set flows to one field and reset another
+ actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to update flow tables", e);
}
LOG.debug("Flow update completed");
}