Added support for OpFlex Endpoint Registry messages. 25/9725/2
authorThomas Bachman <tbachman@yahoo.com>
Thu, 31 Jul 2014 13:35:14 +0000 (13:35 +0000)
committerThomas Bachman <tbachman@yahoo.com>
Tue, 5 Aug 2014 06:06:14 +0000 (06:06 +0000)
This adds support for Endpoint Declaration and Endpoint Request
messages. It converts between OpFlex semantics to the semantics
needed by the Endpoint Registry.

Change-Id: Ic9c869e549ad3f70a5089ad2c9218754726aa1f4
Signed-off-by: Thomas Bachman <tbachman@yahoo.com>
34 files changed:
groupbasedpolicy/src/main/java/org/opendaylight/controller/config/yang/config/opflex_provider/impl/OpflexProviderModule.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/jsonrpc/JsonRpcEndpoint.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/jsonrpc/RpcMessage.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointListener.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointManager.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EpKey.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Identity.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContext.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContext.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexConnectionService.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRenderer.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyManager.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Role.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointDeclarationRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointDeclarationResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointPolicyUpdateRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointPolicyUpdateResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointRequestRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointRequestResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/IdentityRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/IdentityResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyResolutionRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyResolutionResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyTriggerRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyTriggerResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyUpdateRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyUpdateResponse.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/StateReportRequest.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/StateReportResponse.java
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/jsonrpc/JsonRpcEndpointTest.java
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityTest.java [new file with mode: 0644]
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContextTest.java [new file with mode: 0644]
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContextTest.java [new file with mode: 0644]
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexConnectionServiceTest.java

index 108db2c839ee822b8baf68e6df28c58be6bac988..b7bd43892e1261ecb4641c3ef8c5885076d274a7 100644 (file)
@@ -1,11 +1,6 @@
 package org.opendaylight.controller.config.yang.config.opflex_provider.impl;
 
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.groupbasedpolicy.renderer.opflex.OpflexConnectionService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.OpflexRenderer;
 
 public class OpflexProviderModule extends org.opendaylight.controller.config.yang.config.opflex_provider.impl.AbstractOpflexProviderModule {
     public OpflexProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@@ -23,24 +18,8 @@ public class OpflexProviderModule extends org.opendaylight.controller.config.yan
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        final OpflexConnectionService connectionService = new OpflexConnectionService();
-        DataBroker dataBrokerService = getDataBrokerDependency();
-
-        connectionService.setDataProvider(dataBrokerService);
-        final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration =
-                dataBrokerService
-                .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
-                        OpflexConnectionService.DISCOVERY_IID,
-                        connectionService, DataChangeScope.SUBTREE );
-
-        final class AutoCloseableConnectionService implements AutoCloseable {
-            @Override
-            public void close() throws Exception {
-                connectionService.stopping();
-                dataChangeListenerRegistration.close();
-            }
-        }
-        return new AutoCloseableConnectionService();
+        // TODO: getRpcRegistryDependency()
+        return new OpflexRenderer(getDataBrokerDependency(), null);   
     }
 
 }
index 92667d01314cbbeccd95af054f863420bec3dda7..a39099c7696368fa845c4db31bcbee571b3cdf25 100644 (file)
@@ -198,7 +198,6 @@ public class JsonRpcEndpoint implements ChannelFutureListener {
 
                 message = objectMapper.treeToValue(requestJson, callback.getClass());
                 message.setId(requestJson.get("id").asText());
-                //message.setMethod(requestJson.get("method").asText());
 
                 broker.publish(this, message);
             } catch (JsonProcessingException  e) {
index d729d4aed8c93f20c20a44d7560f8fcafdf6b5c7..69343f207a0c5904a5990aceae483af86e593d19 100644 (file)
@@ -29,4 +29,5 @@ public abstract class RpcMessage {
     public abstract void setId(String id);
     public abstract String getMethod();
     public abstract void setMethod(String method);
+    public abstract boolean valid();
 }
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointListener.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointListener.java
new file mode 100644 (file)
index 0000000..92135d1
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.resolver.EgKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * A listener to events related to endpoints being added, removed or updated.
+ * @author readams
+ */
+public interface EndpointListener {
+    /**
+     * The endpoint with the specified layer 2 context and mac address has
+     * been added or updated
+     * @param epKey the key for the affected endpoint
+     */
+    public void endpointUpdated(EpKey epKey);
+    
+    /**
+     * An endpoint attached to a particular node have been added, removed,
+     * or updated
+     * @param nodeId the affected switch node
+     * @param epKey the key for the affected endpoint
+     */
+    public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey);
+    
+    /**
+     * An endpoint for an endpoint group have been added, removed, 
+     * or updated.
+     * @param egKey the key for the affected endpoint group
+     * @param epKey the key for the affected endpoint
+
+     */
+    public void groupEndpointUpdated(EgKey egKey, EpKey epKey);
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointManager.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointManager.java
new file mode 100644 (file)
index 0000000..71cee9b
--- /dev/null
@@ -0,0 +1,660 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.resolver.EgKey;
+import org.opendaylight.groupbasedpolicy.resolver.EndpointProvider;
+import org.opendaylight.groupbasedpolicy.util.SetUtils;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2BridgeDomainId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+/**
+ * Keep track of endpoints on the system.  Maintain an index of endpoints
+ * and their locations for queries from agents.  The endpoint manager will maintain
+ * appropriate indexes only for agents that are attached to the current
+ * controller node.
+ * 
+ * In order to render the policy, we need to be able to efficiently enumerate
+ * all endpoints on a particular agent and also all the agents containing 
+ * each particular endpoint group
+ * @author tbachman
+ */
+public class EndpointManager 
+        extends AbstractEndpointRegistry 
+        implements AutoCloseable, DataChangeListener, 
+        EndpointProvider, RpcBroker.RpcCallback, 
+        L2EprContext.Callback, L3EprContext.Callback {
+    protected static final Logger LOG = 
+            LoggerFactory.getLogger(EndpointManager.class);
+    
+    private static final InstanceIdentifier<Endpoint> endpointsIid = 
+            InstanceIdentifier.builder(Endpoints.class)
+                .child(Endpoint.class).build();
+    private static final InstanceIdentifier<EndpointL3> endpointsL3Iid = 
+            InstanceIdentifier.builder(Endpoints.class)
+                .child(EndpointL3.class).build();
+    // TODO: hacks for now :(
+    private static final String NO_ENDPOINTS = "No endpoints found.";
+    private static final int DEFAULT_PRR = 1000;
+    
+    final ListenerRegistration<DataChangeListener> listenerReg;
+    
+    private OpflexConnectionService connectionService;
+    final ConcurrentHashMap<EpKey, Endpoint> endpoints =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<NodeId, Set<EpKey>> endpointsByNode =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<EgKey, Set<EpKey>> endpointsByGroup = 
+            new ConcurrentHashMap<>();
+    private RpcMessageMap messageMap = null;      
+
+    Set<L2EprContext> l2RpcCtxts = 
+            Collections.newSetFromMap(new ConcurrentHashMap<L2EprContext, Boolean>());            
+    Set<L3EprContext> l3RpcCtxts = 
+            Collections.newSetFromMap(new ConcurrentHashMap<L3EprContext, Boolean>());            
+    
+    private List<EndpointListener> listeners = new CopyOnWriteArrayList<>();
+
+    public EndpointManager(DataBroker dataProvider,
+                           RpcProviderRegistry rpcRegistry,
+                           ScheduledExecutorService executor,
+                           OpflexConnectionService connectionService) {
+        super(dataProvider, rpcRegistry, executor);
+        
+        if (dataProvider != null) {
+            listenerReg = dataProvider
+                    .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, 
+                                                endpointsIid, 
+                                                this, 
+                                                DataChangeScope.ONE);
+        } else
+            listenerReg = null;
+
+        this.connectionService = connectionService;
+
+        /* Subscribe to EPR messages */
+        messageMap = new RpcMessageMap();
+        List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
+        messageMap.addList(messages);
+        for (RpcMessage msg: messages) {
+            this.connectionService.subscribe(msg, this);
+        }
+        LOG.warn("Initialized OpFlex endpoint manager");
+    }
+
+    // ***************
+    // EndpointManager
+    // ***************
+
+    /**
+     * Add a {@link EndpointListener} to get notifications of switch events
+     * @param listener the {@link EndpointListener} to add
+     */
+    public void registerListener(EndpointListener listener) {
+        listeners.add(listener);
+    }
+    
+    /**
+     * Get a collection of endpoints attached to a particular switch
+     * @param nodeId the nodeId of the switch to get endpoints for
+     * @return a collection of {@link Endpoint} objects.
+     */
+    public Collection<Endpoint> getEndpointsForNode(NodeId nodeId) {
+        Collection<EpKey> ebn = endpointsByNode.get(nodeId);
+        if (ebn == null) return Collections.emptyList();
+        return Collections2.transform(ebn, indexTransform);
+    }
+
+    /**
+     * Get the endpoint object for the given key
+     * @param epKey the key
+     * @return the {@link Endpoint} corresponding to the key
+     */
+    public Endpoint getEndpoint(EpKey epKey) {
+        return endpoints.get(epKey);
+    }
+
+    // ****************
+    // EndpointProvider
+    // ****************
+
+    @Override
+    public Collection<Endpoint> getEndpointsForGroup(EgKey eg) {
+        Collection<EpKey> ebg = endpointsByGroup.get(eg);
+        if (ebg == null) return Collections.emptyList();
+        return Collections2.transform(ebg, indexTransform);
+    }
+
+    @Override
+    public List<ConditionName> getCondsForEndpoint(Endpoint endpoint) {
+        // XXX TODO consider group conditions as well.  Also need to notify
+        // endpoint updated if the endpoint group conditions change
+        if (endpoint.getCondition() != null)
+            return endpoint.getCondition();
+        return Collections.emptyList();
+    }
+
+    // ************************
+    // AbstractEndpointRegistry
+    // ************************
+    
+    @Override
+    protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
+        // TODO: implement
+        return null;
+    }
+
+    @Override
+    protected EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
+        return super.buildEndpointL3(input);
+    }
+    
+    // *************
+    // AutoCloseable
+    // *************
+
+    @Override
+    public void close() throws Exception {
+        if (listenerReg != null) listenerReg.close();
+        super.close();
+    }
+
+    // ******************
+    // DataChangeListener
+    // ******************
+
+    @Override
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        for (DataObject dao : change.getCreatedData().values()) {
+            if (dao instanceof Endpoint)
+                updateEndpoint(null, (Endpoint)dao);
+        }
+        for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
+            DataObject old = change.getOriginalData().get(iid);
+            if (old != null && old instanceof Endpoint)
+                updateEndpoint((Endpoint)old, null);
+        }
+        Map<InstanceIdentifier<?>,DataObject> d = change.getUpdatedData();
+        for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
+            if (!(entry.getValue() instanceof Endpoint)) continue;
+            DataObject old = change.getOriginalData().get(entry.getKey());
+            Endpoint oldEp = null;
+            if (old != null && old instanceof Endpoint)
+                oldEp = (Endpoint)old;
+            updateEndpoint(oldEp, (Endpoint)entry.getValue());
+        }
+    }
+    // **************
+    // Implementation
+    // **************
+
+    private void notifyEndpointUpdated(EpKey epKey) {
+        for (EndpointListener l : listeners) {
+            l.endpointUpdated(epKey);
+        }
+    }
+
+    private void notifyNodeEndpointUpdated(NodeId nodeId, EpKey epKey) {
+        for (EndpointListener l : listeners) {
+            l.nodeEndpointUpdated(nodeId, epKey);
+        }
+    }
+
+    private void notifyGroupEndpointUpdated(EgKey egKey, EpKey epKey) {
+        for (EndpointListener l : listeners) {
+            l.groupEndpointUpdated(egKey, epKey);
+        }
+    }
+
+    private Function<EpKey, Endpoint> indexTransform = 
+            new Function<EpKey, Endpoint>() {
+        @Override
+        public Endpoint apply(EpKey input) {
+            return endpoints.get(input);
+        }
+    };
+    
+    private boolean validEp(Endpoint endpoint) {
+        return (endpoint != null && endpoint.getTenant() != null && 
+                endpoint.getEndpointGroup() != null &&
+                endpoint.getL2Context() != null &&
+                endpoint.getMacAddress() != null);
+    }
+    
+    private NodeId getLocation(Endpoint endpoint) {
+        if (!validEp(endpoint)) 
+            return null;
+
+        // TODO: implement
+
+        return null;
+    }
+    
+    private EpKey getEpKey(Endpoint endpoint) {
+        if (!validEp(endpoint)) 
+            return null;
+        return new EpKey(endpoint.getL2Context(), endpoint.getMacAddress());
+    }
+    
+    private EgKey getEgKey(Endpoint endpoint) {
+        if (!validEp(endpoint)) 
+            return null;
+        return new EgKey(endpoint.getTenant(), endpoint.getEndpointGroup());
+    }
+    
+    private Set<EpKey> getEpNSet(NodeId location) {
+        return SetUtils.getNestedSet(location, endpointsByNode);
+    }
+
+    private Set<EpKey> getEpGSet(EgKey eg) {
+        return SetUtils.getNestedSet(eg, endpointsByGroup);
+    }
+    
+    /**
+     * Update the endpoint indexes.  Set newEp to null to remove.
+     */
+    protected void updateEndpoint(Endpoint oldEp, Endpoint newEp) {
+        // XXX TODO only keep track of endpoints that are attached 
+        // to switches that are actually connected to us
+        NodeId oldLoc = getLocation(oldEp);
+        NodeId newLoc = getLocation(newEp);
+
+        EgKey oldKey = getEgKey(oldEp);
+        EgKey newKey = getEgKey(newEp);
+
+        EpKey epKey = getEpKey(oldEp);
+        if (epKey == null) epKey = getEpKey(newEp);
+        if (epKey == null) return;
+
+        boolean notifyOldLoc = false;
+        boolean notifyNewLoc = false;
+        boolean notifyOldEg = false;
+        boolean notifyNewEg = false;
+        
+        if (newEp != null)
+            endpoints.put(epKey, newEp);
+
+        if (oldLoc != null && 
+            (newLoc == null || !oldLoc.equals(newLoc))) {
+            Set<EpKey> eps = getEpNSet(oldLoc);
+            eps.remove(epKey);
+            notifyOldLoc = true;
+        }
+        if (oldKey != null &&
+            (newKey == null || !oldKey.equals(newKey))) {
+            Set<EpKey> gns = getEpGSet(oldKey);
+            gns.remove(epKey);
+            notifyOldEg = true;
+        }
+
+        if (newLoc != null) {
+            Set<EpKey> eps = getEpNSet(newLoc);
+            eps.add(epKey);
+            LOG.debug("Endpoint {} added to node {}", epKey, newLoc);
+            notifyNewLoc = true;
+        }
+        if (newKey != null) {
+            Set<EpKey> gns = getEpGSet(newKey);
+            gns.add(epKey);
+            LOG.debug("Endpoint {} added to group {}", epKey, newKey);
+            notifyNewEg = true;
+        }
+
+        if (newEp == null)
+            endpoints.remove(epKey);
+        
+        notifyEndpointUpdated(epKey);
+
+        if (notifyOldLoc)
+            notifyNodeEndpointUpdated(oldLoc,epKey);
+        if (notifyNewLoc)
+            notifyNodeEndpointUpdated(newLoc,epKey);
+        if (notifyOldEg)
+            notifyGroupEndpointUpdated(oldKey, epKey);
+        if (notifyNewEg)
+            notifyGroupEndpointUpdated(newKey, epKey);
+    }
+    
+    /**
+     * This notification handles the OpFlex Endpoint messages.
+     * We should only receive quest messages. Responses are
+     * sent in a different context, as all requests result 
+     * in a Future to access the data store.
+     * 
+     * @param endpoint The JsonRpcEndpoint that received the request
+     * @param request The request message from the endpoint
+     */
+    @Override
+    public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
+
+        RpcMessage response = null;
+        if (messageMap.get(request.getMethod()) == null) {
+            LOG.warn("message {} was not subscribed to, but was delivered.", request);
+            return;
+        }
+        if (request instanceof IdentityRequest) {
+            connectionService.callback(endpoint, request);
+        }                
+        /*
+         * For declaration requests, we need to make sure that this
+         * EP is in our registry. Since we can have multiple identifiers,
+\        * we create a Set of endpoints.
+         */
+        
+        if (request instanceof EndpointDeclarationRequest) {
+            EndpointDeclarationRequest req = (EndpointDeclarationRequest)request;
+            EndpointDeclarationResponse msg = new EndpointDeclarationResponse();
+            msg.setId(request.getId());
+            response = msg;
+            
+            if (!req.valid() ||
+                (req.getParams().get(0).getIdentifier() == null) ||
+                (req.getParams().get(0).getIdentifier().size() <= 0)) {
+                LOG.warn("Invalid declaration request: {}", req);
+                // TODO: should return error reply?
+                return;
+            }
+            EndpointDeclarationRequest.Params params = req.getParams().get(0);
+            
+            /*
+             * Use the first identifier to determine the type of 
+             * identifier being passed to us, so we can install the
+             * EP into the appropriate EPR list
+             */
+            Identity id = 
+                    new Identity(req.getParams().get(0).getIdentifier().get(0));
+            if (id.isL2()) {
+                L2EprContext ctx = 
+                        new L2EprContext(endpoint, request, 
+                        params.getIdentifier().size(),
+                        dataProvider, executor);
+                ctx.setCallback(this);
+                ctx.createL2Ep(req.getParams().get(0).getContext(), id);
+            }
+            else if (id.isL3()) {
+                L3EprContext ctx = 
+                        new L3EprContext(endpoint, request, 
+                        params.getIdentifier().size(),
+                        dataProvider, executor);                
+                ctx.setCallback(this);
+                ctx.createL3Ep(req.getParams().get(0).getContext(), 
+                               req.getParams().get(0).getIdentifier(), id);
+            }
+        }
+        else if (request instanceof EndpointRequestRequest) {
+            EndpointRequestRequest req = (EndpointRequestRequest)request;
+
+            /*
+             * We query the EPR for the EP. This is an asynchronous
+             * operation, so we send the response in the callback
+             */
+            if (req.valid()) {
+                EndpointRequestRequest.Params params = req.getParams().get(0);
+
+                for (String id: params.getIdentifier()) {
+                    Identity i = new Identity(id);
+                    
+                    if (i.isL2()) {
+                        L2EprContext ctx = 
+                                new L2EprContext(endpoint, request, 
+                                params.getIdentifier().size(),
+                                dataProvider, executor);
+                        this.l2RpcCtxts.add(ctx);
+                        ctx.setCallback(this);
+                        ctx.lookupEndpoint(params.getContext(), id);
+                    }
+                    else if (i.isL3()) {
+                        L3EprContext ctx = 
+                                new L3EprContext(endpoint, request, 
+                                params.getIdentifier().size(),
+                                dataProvider, executor);
+                        this.l3RpcCtxts.add(ctx);                                                
+                        ctx.setCallback(this);
+                        ctx.lookupEndpoint(params.getContext(), id);                        
+                    }
+                }
+            }
+        }
+        
+        if (response != null) {
+            try {
+                endpoint.sendResponse(response);
+            }
+            catch (Throwable t) {
+                LOG.warn("Response {} could not be sent to {}", response, endpoint);
+            }
+        }
+    }
+
+    /**
+     * This notification handles the callback from a query 
+     * of the L2 Endpoint Registry
+     */
+    @Override
+    public void callback(L2EprContext ctx) {
+        if (!(ctx.getRequest() instanceof EndpointRequestRequest)) {
+            return;
+        }
+        EndpointRequestRequest req = 
+                (EndpointRequestRequest)ctx.getRequest();
+        EndpointRequestResponse response = new EndpointRequestResponse();
+        EndpointRequestResponse.Result result = 
+                new EndpointRequestResponse.Result();
+        EndpointRequestResponse.Endpoint endpoint = 
+                new EndpointRequestResponse.Endpoint();
+        List<EndpointRequestResponse.Endpoint> epList = 
+                new ArrayList<EndpointRequestResponse.Endpoint>();
+        
+        /*
+         * If we didn't find any EPs, send the 
+         * error response
+         */
+        if ((ctx.getEps() == null) || (ctx.getEps().size() <= 0)) {
+            EndpointRequestResponse.Error error = 
+                    new EndpointRequestResponse.Error();
+            error.setMessage(NO_ENDPOINTS);
+            response.setError(error);
+        }
+        else {
+            EndpointRequestRequest.Params params = req.getParams().get(0);
+
+            /*
+             * If we get any EP, then we can
+             * provide a response to the original request
+             * Note that we could potentially have multiple
+             * requests outstanding for the same EP, and 
+             * even using different context types (L2 or L3).
+             */
+            for (Endpoint e : ctx.getEps()) {
+                List<String> ids = new ArrayList<String>();
+
+                L2BridgeDomainId l2Context = 
+                        e.getL2Context();
+                if (l2Context != null && 
+                        l2Context.getValue().equals(params.getContext())) {
+                    ids.add(e.getMacAddress().getValue());
+                    endpoint.setIdentifier(ids);
+                    endpoint.setContext(l2Context.getValue());
+                }
+                /* TODO: Need to look this up in op store */
+                //endpoint.setLocation("");
+                //endpoint.setPolicy_name("");
+                //endpoint.setStatus("");
+                //endpoint.setSubject("");
+                endpoint.setPrr(DEFAULT_PRR);
+                epList.add(endpoint);
+                /*
+                 * For EPs on a different agent, we need to look up the 
+                 * VTEP information. For now, we're only supporting 
+                 * VXLAN VTEPs, so we look up the destination tunnel IP,
+                 * and provide that in the data field of the response
+                 */
+                // TODO: Need to look this up in op store
+                //endpoint.setData();
+            }
+            result.setEndpoint(epList);
+            response.setResult(result);
+        }
+        try {
+            ctx.getEp().sendResponse(response);
+        }
+        catch (Throwable t) {
+            // TODO: implement
+        }
+        this.l2RpcCtxts.remove(ctx);
+    }
+
+    /**
+     * This notification handles the callback from a query 
+     * of the L3 Endpoint Registry
+     */
+    
+    @Override
+    public void callback(L3EprContext ctx) {        
+        if (!(ctx.getRequest() instanceof EndpointRequestRequest)) {
+            return;
+        }
+        EndpointRequestRequest req = 
+                (EndpointRequestRequest)ctx.getRequest();
+        EndpointRequestResponse response = new EndpointRequestResponse();
+        response.setId(ctx.getRequest().getId());
+        EndpointRequestResponse.Result result = 
+                new EndpointRequestResponse.Result();
+        EndpointRequestResponse.Endpoint endpoint = 
+                new EndpointRequestResponse.Endpoint();
+        List<EndpointRequestResponse.Endpoint> epList = 
+                new ArrayList<EndpointRequestResponse.Endpoint>();
+        
+        /*
+         * If we didn't find any EPs, send the 
+         * error response
+         */
+        if ((ctx.getEps() == null) || (ctx.getEps().size() <= 0)) {
+            EndpointRequestResponse.Error error = 
+                    new EndpointRequestResponse.Error();
+            error.setMessage(NO_ENDPOINTS);
+            response.setError(error);
+        }
+        else {
+            EndpointRequestRequest.Params params = req.getParams().get(0);
+
+            /*
+             * If we get any EP, then we can
+             * provide a response to the original request
+             * Note that we could potentially have multiple
+             * requests outstanding for the same EP, and 
+             * even using different context types (L2 or L3).
+             */
+            for (EndpointL3 e : ctx.getEps()) {
+                List<String> ids = new ArrayList<String>();
+
+                String l3Context = "";
+                
+                /* 
+                 * The OpFlex RFC indicates that a single 
+                 * Endpoint Request can match on multiple
+                 * Endpoints, as the identifiers may not
+                 * be unique (e.g. multiple IP addresses). 
+                 * However, GBP scopes the endpoint's 
+                 * identifier with the L3 context, which
+                 * means there will only be a single match.
+                 * As a result, send the response once we
+                 * get a single EP
+                 */
+                for (L3Address l3Addr : e.getL3Address()) {
+                    if (l3Addr.getL3Context().getValue()
+                            .equals(params.getContext())) {
+                        if (l3Addr.getIpAddress().getIpv4Address() != null) { 
+                        ids.add(l3Addr.
+                                getIpAddress()
+                                .getIpv4Address().getValue().toString());
+                        }
+                        else if (l3Addr.getIpAddress().getIpv6Address() != null) {
+                            ids.add(l3Addr.getIpAddress().
+                                    getIpv6Address().getValue().toString());
+                        }
+                        l3Context = l3Addr.getL3Context().getValue();
+                    }
+                }
+                if (ids.size() > 0) {
+                    endpoint.setIdentifier(ids);
+                }
+                endpoint.setContext(l3Context);
+                /* TODO: get these from the op store */
+                //endpoint.setLocation("");
+                //endpoint.setPolicy_name("");
+                //endpoint.setStatus("");
+                //endpoint.setSubject("");
+                endpoint.setPrr(DEFAULT_PRR);
+                epList.add(endpoint);
+                /*
+                 * For EPs on a different agent, we need to look up the 
+                 * VTEP information. For now, we're only supporting 
+                 * VXLAN VTEPs, so we look up the destination tunnel IP,
+                 * and provide that in the data field of the response
+                 */
+                // TODO: get this from the op store
+                //endpoint.setData();
+            }
+            result.setEndpoint(epList);
+            response.setResult(result);
+        }
+        try {
+            ctx.getEp().sendResponse(response);
+        }
+        catch (Throwable t) {
+            // TODO: implement
+        }
+        this.l3RpcCtxts.remove(ctx);
+    }    
+    
+}
\ No newline at end of file
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EpKey.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EpKey.java
new file mode 100644 (file)
index 0000000..ac9ea5d
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import javax.annotation.concurrent.Immutable;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2ContextId;
+
+/**
+ * A key for a single endpoint
+ */
+@Immutable
+public class EpKey {
+
+    final L2ContextId l2Context;
+    final MacAddress macAddress;
+    
+    public EpKey(L2ContextId l2Context, MacAddress macAddress) {
+        super();
+        this.l2Context = l2Context;
+        this.macAddress = macAddress;
+    }
+
+    public L2ContextId getL2Context() {
+        return l2Context;
+    }
+
+    public MacAddress getMacAddress() {
+        return macAddress;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result +
+                 ((l2Context == null) ? 0 : l2Context.hashCode());
+        result = prime * result +
+                 ((macAddress == null) ? 0 : macAddress.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        EpKey other = (EpKey) obj;
+        if (l2Context == null) {
+            if (other.l2Context != null)
+                return false;
+        } else if (!l2Context.equals(other.l2Context))
+            return false;
+        if (macAddress == null) {
+            if (other.macAddress != null)
+                return false;
+        } else if (!macAddress.equals(other.macAddress))
+            return false;
+        return true;
+    }
+    
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Identity.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Identity.java
new file mode 100644 (file)
index 0000000..6e4dff8
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2BridgeDomainId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L3ContextId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3AddressBuilder;
+
+import com.google.common.net.InetAddresses;
+
+
+/**
+ * An Identity for OpFlex. Identities can take on many
+ * forms, so it's possible that this class may be replaced
+ * by an abstract class with different concrete types.
+ * At the moment, we're only dealing with IP and MAC
+ * addresses.
+ * 
+ * This class also provides methods for getting the identity
+ * in forms by the yang model, and are therefore usable by
+ * other classes in the policy model (e.g. the objects
+ * needed by the Endpoint Registry).
+ *
+ */
+public class Identity {
+    enum IdentityType {
+        UNKNOWN, IP_ADDRESS, MAC_ADDRESS;
+    }
+    private IdentityType type = IdentityType.UNKNOWN;
+    private L3ContextId l3Context = null;
+    private IpAddress primaryIp = null;
+    private Set<IpAddress> ips = null;
+    private L2BridgeDomainId l2Context = null;
+    private MacAddress mac = null;
+    public Identity(String id) {
+        /*
+         * Determine the ID type and populate
+         */
+        if (idIsIp(id)) {
+            type = IdentityType.IP_ADDRESS;
+            ips = Collections.newSetFromMap(new ConcurrentHashMap<IpAddress, Boolean>());
+            if (primaryIp == null) primaryIp = normalizeIpAddress(id);
+            ips.add(normalizeIpAddress(id));
+        }
+        else if (idIsMac(id)) {
+            type = IdentityType.MAC_ADDRESS;
+            mac = normalizeMacAddress(id);
+        }
+        
+    }
+    
+    public void setContext(String context) {
+        switch (type) {
+        case MAC_ADDRESS:
+            l2Context = new L2BridgeDomainId(context);
+            break;
+        case IP_ADDRESS:
+            l3Context = new L3ContextId(context);
+            break;
+        default:
+            break;
+        }
+    }
+    
+    /**
+     * Adds a new identifier to the list. Some types of
+     * identities allow for list of identifiers (e.g. L3).
+     * 
+     * @param id The new identifier to add to the list
+     */
+    public void addId(String id) {
+        switch (type) {
+        case IP_ADDRESS:
+            ips.add(normalizeIpAddress(id));
+            break;
+        default:
+            break;
+        }
+    }
+    
+    private boolean idIsIp(String id) {
+        return InetAddresses.isInetAddress(id);
+    }
+    
+    /*
+     * Verifies MAC addresses with the following formats:
+     * 0xAA:0xBB:0xCC:0xDD:0xEE:0xFF
+     * AA:BB:CC:DD:EE:FF
+     * 0xAA:BB:CC:DD:EE:FF
+     * 0xAA-0xBB-0xCC-0xDD-0xEE-0xFF
+     */
+    private boolean idIsMac(String id) {
+        /*
+         * First check/remove separators
+         */
+        String[] sixFields = id.split(":");
+        if (sixFields.length != 6) {
+            sixFields = id.split("-");
+            if (sixFields.length != 6) {
+                return false;
+            }
+        }
+
+        for (String field : sixFields) {
+            /* Strip '0x' if present */
+            field = field.replace("0x", "");
+            if (field.length() > 2 || field.length() <1) {
+                return false;
+            }
+            if (!Pattern.matches("[0-9a-fA-F]{1,2}", field)) {
+                return false;
+            }
+        }
+        return true;        
+    }
+    
+    /**
+     * Check if this {@link Identity} is an L3 type (Ip Address)
+     * 
+     * @return true if L3, false if not
+     */
+    public boolean isL3() {
+        return (type == IdentityType.IP_ADDRESS);
+    }
+    
+    /**
+     * Check if this {@link Identity} is an L2 type (MAC Address) 
+     * 
+     * @return true if L2, false if not
+     */
+    public boolean isL2() {
+        return (type == IdentityType.MAC_ADDRESS);
+    }
+
+    /**
+     * Return the context, regardless of type, as a string.
+     * 
+     * @return String representing the context for this Identity
+     */
+    public String contextAsString() {
+        switch (type) {
+        case MAC_ADDRESS:
+            return l2Context.toString();
+        case IP_ADDRESS:
+            return l3Context.toString();
+        default:
+            return null;
+        }
+    }
+
+    /**
+     * Returns the identity as a string. The format
+     * of the string depends on the identity type.
+     * When the identity is a actually a list, only
+     * the first identity is returned.
+     * 
+     * @return null if type is UKNOWN, otherwise String
+     */
+    public String identityAsString() {
+        switch (type) {
+        case MAC_ADDRESS:
+            return mac.getValue();
+        case IP_ADDRESS:
+            List<IpAddress> ipl = new ArrayList<IpAddress>(ips);
+            IpAddress i = ipl.get(0);
+            if (i.getIpv4Address() != null) {
+                return i.getIpv4Address().getValue();
+            }
+            else if (i.getIpv6Address() != null) {
+                return i.getIpv6Address().getValue();
+            }
+        default:
+        }
+        return null;
+    }
+    
+    /**
+     * Get the L2 context in an Endpoint Registry 
+     * compatible format
+     * 
+     * @return The Layer 2 context
+     */
+    public L2BridgeDomainId getL2Context() {
+        return l2Context;
+    }
+    
+    /**
+     * Get the L2 identity in an Endpoint Registry 
+     * compatible format
+     * 
+     * @return The Layer 2 identity
+     */
+    public MacAddress getL2Identity() {
+        return mac;
+    }
+
+    /**
+     * Get the L3 context in an Endpoint Registry 
+     * compatible format
+     * 
+     * @return The Layer 3 context
+     */    
+    public L3ContextId getL3Context() {
+        return l3Context;
+    }
+
+    /**
+     * Get the L3 identity in an Endpoint Registry 
+     * compatible format
+     * 
+     * @return The Layer 3 identity
+     */
+    public IpAddress getL3Identity() {
+        return primaryIp;
+    }
+
+    public List<L3Address> getL3Addresses() {
+
+        List<L3Address> l3List= new ArrayList<L3Address>();
+        List<IpAddress> ipList = new ArrayList<IpAddress>();
+        ipList.addAll(ips);
+        for (IpAddress i: ipList){ 
+            L3AddressBuilder l3ab = new L3AddressBuilder();
+            l3ab.setIpAddress(i);
+            l3ab.setL3Context(l3Context);
+            l3List.add(l3ab.build());
+        }
+
+        return l3List;
+    }
+
+    private IpAddress normalizeIpAddress(String identifier) {
+        return IpAddressBuilder.getDefaultInstance(identifier);
+    }
+    
+    private MacAddress normalizeMacAddress(String identifier) {
+        MacAddress m = new MacAddress(identifier);
+        return m;
+    }
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContext.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContext.java
new file mode 100644 (file)
index 0000000..1b98b88
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2BridgeDomainId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+/**
+ * A context for managing operations to the Endpoint Registry's 
+ * list of L2 Endpoints.
+ * 
+ * @author tbachman
+ *
+ */
+public class L2EprContext implements FutureCallback<Optional<Endpoint>>{
+    // TODO: hacks for now :(
+    private static final String DEFAULT_TENANT = "d7f08a78-a435-45c3-b4be-a634829be541";
+    private static final String DEFAULT_EPG = "b67946f0-4ac5-4b44-aa85-059fb0b0d475";
+    
+    public interface Callback {
+        public void callback(L2EprContext ctx);
+    }
+    private DataBroker dataProvider;
+    private ScheduledExecutorService executor;    
+    private final JsonRpcEndpoint peer;
+    private final RpcMessage request;
+    private final int numIdentifiers;
+    private Callback cb;
+    private int calls;
+    private Set<Endpoint> eps;
+    public L2EprContext(JsonRpcEndpoint peer, RpcMessage request, 
+            int numIdentifiers,  
+            DataBroker dataProvider, ScheduledExecutorService executor) {
+        this.peer = peer;
+        this.request = request;
+        this.numIdentifiers = numIdentifiers;
+        this.dataProvider = dataProvider;
+        this.executor = executor;
+        this.calls = numIdentifiers;
+        eps = Collections.newSetFromMap(new ConcurrentHashMap<Endpoint, Boolean>());
+    }
+    public void setCallback(Callback callback) {
+        this.cb = callback;
+    }
+    public JsonRpcEndpoint getEp() {
+        return peer;
+    }
+    public RpcMessage getRequest() {
+        return request;
+    }
+
+    public int getNumIdentifiers() {
+        return numIdentifiers;
+    }
+
+    public Set<Endpoint> getEps() {
+        return eps;
+    }
+    public void setEp(Endpoint ep) {
+        eps.add(ep);
+    }
+
+    /**
+     * Create an L2 Endpoint in the Endopint Registry
+     * 
+     * @param req The OpFlex EP Declaration Request message
+     * @param id The identity of the EP to create
+     */
+    public void createL2Ep(String context, Identity id) {
+        EndpointBuilder epBuilder = new EndpointBuilder();
+
+        id.setContext(context);
+        epBuilder.setL2Context(id.getL2Context());
+        epBuilder.setMacAddress(id.getL2Identity());
+        
+        // TODO: add timestamp support
+        //epBuilder.setTimestamp(Timestamp);
+        // TODO: add support for conditions
+        //epBuilder.setCondition(new List<ConditionName>());
+
+        // TODO: where do we get the tenant and EPG?
+        TenantId tid = new TenantId(DEFAULT_TENANT);
+        EndpointGroupId eid = new EndpointGroupId(DEFAULT_EPG);
+        epBuilder.setTenant(tid);
+        epBuilder.setEndpointGroup(eid);
+
+        Endpoint ep = epBuilder.build();
+        InstanceIdentifier<Endpoint> iid = 
+                InstanceIdentifier.builder(Endpoints.class)
+                .child(Endpoint.class, ep.getKey())
+                .build();
+        WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
+        wt.put(LogicalDatastoreType.OPERATIONAL, iid, ep);
+        wt.submit();
+    }
+
+    /**
+     * Look up an L2 endpoint in the registry, given a context
+     * and an identifier.
+     * .
+     * @param context The L2 Context
+     * @param identifier The L2 identifier
+     */
+    public void lookupEndpoint(String context, String identifier) {
+
+        if (context == null || identifier == null) return;
+
+        MacAddress mac = new MacAddress(identifier);
+        EndpointKey key = 
+                new EndpointKey(new L2BridgeDomainId(context), mac);
+        InstanceIdentifier<Endpoint> iid = 
+                InstanceIdentifier.builder(Endpoints.class)
+                .child(Endpoint.class, key)
+                .build();
+        ListenableFuture<Optional<Endpoint>> dao =
+                dataProvider.newReadOnlyTransaction()
+                    .read(LogicalDatastoreType.OPERATIONAL, iid);
+        Futures.addCallback(dao, this, executor);
+    }
+
+    @Override
+    public void onSuccess(final Optional<Endpoint> result) {
+        calls--;
+        if (!result.isPresent()) {
+            /*
+             * This EP doesn't exist in the registry. If 
+             * all of the data store queries have been made,
+             * and we still don't have any EPs, then provide
+             * an error result.
+             */
+            if (calls <= 0) {
+                cb.callback(this);
+            }
+            return;
+        }
+        setEp(result.get());
+
+        cb.callback(this);
+    }
+
+
+    @Override
+    public void onFailure(Throwable t) {
+        // TODO: implement another callback
+    }
+    
+    
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContext.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContext.java
new file mode 100644 (file)
index 0000000..1c94c06
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Key;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+/**
+ * A context for managing operations to the Endpoint Registry's 
+ * list of L3 Endpoints.
+ * 
+ * @author tbachman
+ *
+ */
+public class L3EprContext implements FutureCallback<Optional<EndpointL3>>{
+    // TODO: hacks for now :(
+    private static final String DEFAULT_TENANT = "d7f08a78-a435-45c3-b4be-a634829be541";
+    private static final String DEFAULT_EPG = "b67946f0-4ac5-4b44-aa85-059fb0b0d475";
+    
+    public interface Callback {
+        public void callback(L3EprContext ctx);
+    }
+    private DataBroker dataProvider;
+    private ScheduledExecutorService executor;      
+    private final JsonRpcEndpoint peer;
+    private final RpcMessage request;
+    private final int numIdentifiers;
+    private Callback cb;
+    private int calls;
+    private Set<EndpointL3> eps;
+    public L3EprContext(JsonRpcEndpoint peer, RpcMessage request, 
+            int numIdentifiers, DataBroker dataProvider, ScheduledExecutorService executor) {
+        this.peer = peer;
+        this.request = request;
+        this.numIdentifiers = numIdentifiers;
+        this.dataProvider = dataProvider;
+        this.executor = executor;        
+        this.calls = numIdentifiers;
+        eps = Collections.newSetFromMap(new ConcurrentHashMap<EndpointL3, Boolean>());
+    }
+    public void setCallback(Callback callback) {
+        this.cb = callback;
+    }
+    public JsonRpcEndpoint getEp() {
+        return peer;
+    }
+    public RpcMessage getRequest() {
+        return request;
+    }
+
+    public int getNumIdentifiers() {
+        return numIdentifiers;
+    }
+
+    public Set<EndpointL3> getEps() {
+        return eps;
+    }
+    public void setEp(EndpointL3 ep) {
+        eps.add(ep);
+    }
+    
+    /**
+     * Create an L3 Endpoint in the Endopint Registry
+     * 
+     * @param req The OpFlex EP Declaration Request message
+     * @param id The identity of the EP to create
+     */
+    public void createL3Ep(String context, List<String> ids, Identity id) {
+        EndpointL3Builder epBuilder = new EndpointL3Builder();
+
+        for ( String l3Addr : ids ) {
+            if (InetAddresses.isInetAddress(l3Addr)) {
+                id.addId(l3Addr);
+            }
+        }
+        id.setContext(context);
+        epBuilder.setIpAddress(id.getL3Identity());
+        epBuilder.setL3Context(id.getL3Context());
+        epBuilder.setL3Address(id.getL3Addresses());
+
+        // TODO: where do we get the tenant and EPG?
+        TenantId tid = new TenantId(DEFAULT_TENANT);
+        EndpointGroupId eid = new EndpointGroupId(DEFAULT_EPG);
+        epBuilder.setTenant(tid);
+        epBuilder.setEndpointGroup(eid);
+
+        EndpointL3 ep = epBuilder.build();
+        InstanceIdentifier<EndpointL3> iid = 
+                InstanceIdentifier.builder(Endpoints.class)
+                .child(EndpointL3.class, ep.getKey())
+                .build();
+        WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
+        wt.put(LogicalDatastoreType.OPERATIONAL, iid, ep);
+        wt.submit();
+    }
+
+    /**
+     * Look up an L3 endpoint in the registry, given a context
+     * and an identifier.
+     * .
+     * @param context The L3 Context
+     * @param identifier The L3 identifier
+     */
+    public void lookupEndpoint(String context, String identifier) {
+
+        if (context == null || identifier == null) return;
+        Identity i = new Identity(identifier);
+        i.setContext(context);
+
+        EndpointL3Key key = new EndpointL3Key(i.getL3Identity(), i.getL3Context());
+        InstanceIdentifier<EndpointL3> iid = 
+                InstanceIdentifier.builder(Endpoints.class)
+                .child(EndpointL3.class, key)
+                .build();
+        ListenableFuture<Optional<EndpointL3>> dao =
+                dataProvider.newReadOnlyTransaction()
+                    .read(LogicalDatastoreType.OPERATIONAL, iid);
+        Futures.addCallback(dao, this, executor);
+    }
+
+    @Override
+    public void onSuccess(final Optional<EndpointL3> result) {
+        calls--;
+        if (!result.isPresent()) {
+            /*
+             * This EP doesn't exist in the registry. If 
+             * all of the data store queries have been made,
+             * and we still don't have any EPs, then provide
+             * an error result.
+             */
+            if (calls <= 0) {
+                cb.callback(this);
+            }
+            return;
+        }
+        setEp(result.get());
+
+        cb.callback(this);
+    }
+
+
+    @Override
+    public void onFailure(Throwable t) {
+        // TODO: implement another callback
+    }
+
+}
index 4a487130b5f416d6e48074c3b3256455cce0c05f..0b323c967002054b8bedb842ca304aef72b92d1a 100644 (file)
@@ -297,13 +297,6 @@ public class OpflexConnectionService
         }
         opflexListenIp = listenIp;
 
-        /*
-         * Set up the messages supported by each OpFlex policy
-         * component
-         */
-        /* this class implements identity handlers */
-        subscribe(new IdentityRequest(), this);
-
         initializeServers();
     }
 
@@ -541,6 +534,7 @@ public class OpflexConnectionService
      @Override
      public void close() throws ExecutionException, InterruptedException {
 
+         stopping();
          executor.shutdownNow();
 
          if (dataProvider != null) {
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRenderer.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRenderer.java
new file mode 100644 (file)
index 0000000..22a397a
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Renderer that uses OpenFlow and OVSDB to implement an overlay network
+ * using Open vSwitch.
+ * @author readams
+ */
+public class OpflexRenderer implements AutoCloseable, DataChangeListener {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(OpflexRenderer.class);
+
+    private final DataBroker dataBroker;
+    private final PolicyResolver policyResolver;
+    private final EndpointManager endpointManager;
+    private final PolicyManager policyManager;
+    private final OpflexConnectionService connectionService;
+
+    private final ScheduledExecutorService executor;
+
+    ListenerRegistration<DataChangeListener> configReg;
+
+    public OpflexRenderer(DataBroker dataProvider,
+                             RpcProviderRegistry rpcRegistry) {
+        super();
+        this.dataBroker = dataProvider;
+
+        int numCPU = Runtime.getRuntime().availableProcessors();
+        executor = Executors.newScheduledThreadPool(numCPU * 2);
+
+        connectionService = new OpflexConnectionService();
+        connectionService.setDataProvider(dataBroker);
+        
+        endpointManager = new EndpointManager(dataProvider, rpcRegistry,
+                                              executor, connectionService);
+        policyResolver = new PolicyResolver(dataProvider, executor);
+
+        policyManager = new PolicyManager(dataProvider,
+                                          policyResolver,
+                                          endpointManager,
+                                          rpcRegistry,
+                                          executor);
+        
+        final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration =
+                dataBroker
+                .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+                        OpflexConnectionService.DISCOVERY_IID,
+                        connectionService, DataChangeScope.SUBTREE );
+
+        final class AutoCloseableConnectionService implements AutoCloseable {
+            @Override
+            public void close() throws Exception {
+                connectionService.stopping();
+                dataChangeListenerRegistration.close();
+            }
+        }
+
+        LOG.info("Initialized OpFlex renderer");
+    }
+
+    // *************
+    // AutoCloseable
+    // *************
+
+    @Override
+    public void close() throws Exception {
+        executor.shutdownNow();
+        if (configReg != null) configReg.close();
+        if (policyResolver != null) policyResolver.close();
+        if (connectionService != null) connectionService.close();
+        if (endpointManager != null) endpointManager.close();
+    }
+
+    // ******************
+    // DataChangeListener
+    // ******************
+
+    @Override
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
+                                                   DataObject> change) {
+    }
+
+    // **************
+    // Implementation
+    // **************
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyManager.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyManager.java
new file mode 100644 (file)
index 0000000..18de0d7
--- /dev/null
@@ -0,0 +1,348 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
+import org.opendaylight.groupbasedpolicy.resolver.EgKey;
+import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
+import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
+import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
+import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
+import org.opendaylight.groupbasedpolicy.util.SetUtils;
+import org.opendaylight.groupbasedpolicy.util.SingletonTask;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage policies on switches by subscribing to updates from the 
+ * policy resolver and information about endpoints from the endpoint 
+ * registry
+ * @author tbachman
+ */
+public class PolicyManager 
+     implements PolicyListener, EndpointListener {
+    private static final Logger LOG = 
+            LoggerFactory.getLogger(PolicyManager.class);
+
+    private final DataBroker dataBroker;
+    private final PolicyResolver policyResolver;
+    
+    private final PolicyScope policyScope;
+    
+    private final AtomicReference<Dirty> dirty;
+    
+    private final ScheduledExecutorService executor;
+    private final SingletonTask flowUpdateTask;
+
+    /**
+     * Counter used to allocate ordinal values for forwarding contexts
+     * and VNIDs
+     */
+    private final AtomicInteger policyOrdinal = new AtomicInteger(1);
+    
+    /**
+     * Keep track of currently-allocated ordinals
+     */
+    // XXX For the endpoint groups, we need a globally unique ordinal, so
+    // should ultimately involve some sort of distributed agreement
+    // or a leader to allocate them.  For now we'll just use a counter and
+    // this local map.  Also theoretically need to garbage collect periodically
+    private final ConcurrentMap<TenantId, ConcurrentMap<String, Integer>> ordinals = 
+            new ConcurrentHashMap<>();
+    // XXX - need to garbage collect
+    private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals = 
+            new ConcurrentHashMap<>();
+            
+    public PolicyManager(DataBroker dataBroker,
+                         PolicyResolver policyResolver,
+                         EndpointManager endpointManager, 
+                         RpcProviderRegistry rpcRegistry,
+                         ScheduledExecutorService executor) {
+        super();
+        this.dataBroker = dataBroker;
+        this.executor = executor;
+        this.policyResolver = policyResolver;
+
+
+        policyScope = policyResolver.registerListener(this);
+
+        endpointManager.registerListener(this);
+        
+        dirty = new AtomicReference<>(new Dirty());
+        
+        flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
+        scheduleUpdate();
+        
+        LOG.debug("Initialized OFOverlay policy manager");
+    }
+
+
+    // ****************
+    // EndpointListener
+    // ****************
+    
+    @Override
+    public void endpointUpdated(EpKey epKey) {
+        dirty.get().addEndpoint(epKey);
+        scheduleUpdate();
+    }
+
+    @Override
+    public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
+        dirty.get().addNodeEp(nodeId, epKey);
+        scheduleUpdate();
+    }
+
+    @Override
+    public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
+        dirty.get().addEndpointGroupEp(egKey, epKey);
+        policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
+        scheduleUpdate();
+    }
+
+    // **************
+    // PolicyListener
+    // **************
+    
+    @Override
+    public void policyUpdated(Set<EgKey> updatedConsumers) {
+        for (EgKey key : updatedConsumers) {
+            dirty.get().addEndpointGroup(key);
+        }
+        scheduleUpdate();
+    }
+
+    // *************
+    // PolicyManager
+    // *************
+
+    /**
+     * Set the learning mode to the specified value
+     * @param learningMode the learning mode to set
+     */
+    public void setLearningMode(LearningMode learningMode) {
+        // No-op for now
+    }
+
+    /**
+     * Get a unique ordinal for the given condition group, suitable for
+     * use in the data plane.  This is unique only for this node, and not 
+     * globally.
+     * @param cg the {@link ConditionGroup}
+     * @return the unique ID
+     */
+    public int getConfGroupOrdinal(final ConditionGroup cg) {
+        if (cg == null) return 0;
+        Integer ord = cgOrdinals.get(cg);
+        if (ord == null) {
+            ord = policyOrdinal.getAndIncrement();
+            Integer old = cgOrdinals.putIfAbsent(cg, ord);
+            if (old != null) ord = old; 
+        }
+        return ord.intValue();
+    }
+
+    /**
+     * Get a 32-bit context ordinal suitable for use in the OF data plane
+     * for the given policy item.  Note that this function may block
+     * @param tenantId the tenant ID of the element
+     * @param id the unique ID for the element
+     * @return the 32-bit ordinal value
+     */
+    public int getContextOrdinal(final TenantId tenantId, 
+                                 final UniqueId id) throws Exception {
+        if (tenantId == null || id == null) return 0;
+        ConcurrentMap<String, Integer> m = ordinals.get(tenantId);
+        if (m == null) {
+            m = new ConcurrentHashMap<>();
+            ConcurrentMap<String, Integer> old = 
+                    ordinals.putIfAbsent(tenantId, m);
+            if (old != null) m = old;
+        }
+        Integer ord = m.get(id.getValue());
+        if (ord == null) {
+            ord = policyOrdinal.getAndIncrement();
+            Integer old = m.putIfAbsent(id.getValue(), ord);
+            if (old != null) ord = old;
+        }
+
+        return ord.intValue();
+//        while (true) {
+//            final ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
+//            InstanceIdentifier<DataPlaneOrdinal> iid =
+//                    InstanceIdentifier.builder(OfOverlayOperational.class)
+//                    .child(DataPlaneOrdinal.class, 
+//                           new DataPlaneOrdinalKey(id, tenantId))
+//                    .build();
+//            ListenableFuture<Optional<DataObject>> r = 
+//                    t.read(LogicalDatastoreType.OPERATIONAL, iid);
+//            Optional<DataObject> res = r.get();
+//            if (res.isPresent()) {
+//                DataPlaneOrdinal o = (DataPlaneOrdinal)res.get();
+//                return o.getOrdinal().intValue();
+//            }
+//            final int ordinal = policyOrdinal.getAndIncrement();
+//            OfOverlayOperational oo = new OfOverlayOperationalBuilder()
+//                .setDataPlaneOrdinal(ImmutableList.of(new DataPlaneOrdinalBuilder()
+//                    .setId(id)
+//                    .setTenant(tenantId)
+//                    .setOrdinal(Long.valueOf(ordinal))
+//                    .build()))
+//                .build();
+//            t.merge(LogicalDatastoreType.OPERATIONAL, 
+//                    InstanceIdentifier.builder(OfOverlayOperational.class)
+//                    .build(), 
+//                    oo);
+//            ListenableFuture<RpcResult<TransactionStatus>> commitr = t.commit();
+//            try {
+//                commitr.get();
+//                return ordinal;
+//            } catch (ExecutionException e) {
+//                if (e.getCause() instanceof OptimisticLockFailedException)
+//                    continue;
+//                throw e;
+//            }
+//        }
+    }
+    
+    // **************
+    // Implementation
+    // **************
+
+    private void scheduleUpdate() {
+
+        // TODO: send policy updates
+    }
+    
+    /**
+     * Update the flows on a particular switch
+     */
+    private class SwitchFlowUpdateTask implements Callable<Void> {
+        private final Dirty dirty;
+        private final NodeId nodeId;
+
+        public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
+            super();
+            this.dirty = dirty;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            PolicyInfo info = policyResolver.getCurrentPolicy();
+            if (info == null) return null;
+
+            return null;
+        }
+    }
+
+    /**
+     * Update all flows on all switches as needed.  Note that this will block
+     * one of the threads on the executor.
+     * @author readams
+     */
+    private class FlowUpdateTask implements Runnable {
+        @Override
+        public void run() {
+            LOG.debug("Beginning flow update task");
+
+            Dirty d = dirty.getAndSet(new Dirty());
+            CompletionService<Void> ecs
+                = new ExecutorCompletionService<Void>(executor);
+            int n = 0;
+            for (int i = 0; i < n; i++) {
+                try {
+                    ecs.take().get();
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Failed to update flow tables", e);
+                }
+            }
+            LOG.debug("Flow update completed");
+        }
+    }
+    
+    /**
+     * Dirty state since our last successful flow table sync.
+     */
+    public static class Dirty {
+        private Set<EpKey> endpoints;
+        private Set<NodeId> nodes;
+        private Set<EgKey> groups;
+        private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
+        private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
+        
+        public Dirty() {
+            ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
+            endpoints = Collections.newSetFromMap(epmap);
+            ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
+            nodes = Collections.newSetFromMap(nomap);
+            ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
+            groups = Collections.newSetFromMap(grmap);
+
+            groupEps = new ConcurrentHashMap<>();
+            nodeEps = new ConcurrentHashMap<>();
+        }
+        
+        public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
+            SetUtils.getNestedSet(egKey, groupEps)
+                .add(epKey);
+        }
+        public void addNodeEp(NodeId id, EpKey epKey) {
+            SetUtils.getNestedSet(id, nodeEps).add(epKey);
+        }
+        public void addNode(NodeId id) {
+            nodes.add(id);
+        }
+        public void addEndpointGroup(EgKey key) {
+            groups.add(key);
+        }
+        public void addEndpoint(EpKey epKey) {
+            endpoints.add(epKey);
+        }
+
+        public Set<EpKey> getEndpoints() {
+            return endpoints;
+        }
+
+        public Set<NodeId> getNodes() {
+            return nodes;
+        }
+
+        public Set<EgKey> getGroups() {
+            return groups;
+        }
+
+        public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
+            return groupEps;
+        }
+
+        public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {
+            return nodeEps;
+        }
+        
+    }
+}
index c314eea8f8715cd24f9e9177bc0ebc476ea3e3c6..69e467233f9173b43a5842f046a779c3ddca0e9e 100644 (file)
@@ -13,6 +13,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestResponse;
 import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
 import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
 
@@ -30,6 +34,10 @@ public enum Role {
 
     static IdentityRequest idReq = new IdentityRequest();
     static IdentityResponse idRsp = new IdentityResponse();
+    static EndpointDeclarationRequest epDeclReq = new EndpointDeclarationRequest();
+    static EndpointDeclarationResponse epDeclRsp = new EndpointDeclarationResponse();
+    static EndpointRequestRequest epReqReq = new EndpointRequestRequest();
+    static EndpointRequestResponse epReqRsp = new EndpointRequestResponse();
 
     private final String role;
 
@@ -53,6 +61,8 @@ public enum Role {
             List<RpcMessage> msgList = new ArrayList<RpcMessage>();
             msgList.add(idReq);
             msgList.add(idRsp);
+            msgList.add(epDeclReq);
+            msgList.add(epReqReq);
             return msgList;
         }
         else if (role.equals(OBSERVER.toString())) {
index 54ff01c37cbeee95fe326b683153a394971679bc..e846fa7a78a9e314b2e39872ec6e68e23544a78c 100644 (file)
@@ -134,4 +134,18 @@ public class EndpointDeclarationRequest extends RpcMessage {
         this.name = DECLARATION_MESSAGE;
         this.method = DECLARATION_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index b93fdc32cf6fc5df973ab9e31c3e0940dd032e14..ae2e554073b3e78e59d61cd5d97303f1b05a6c46 100644 (file)
@@ -95,5 +95,10 @@ public class EndpointDeclarationResponse extends RpcMessage {
     public void setName(String name) {
         this.name = name;
     }
-
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
 }
index 5cf52a21eb859c759f05fc1d2c768fde7fae422b..94cba2ab067db1b06ef63b6328f1882d4cebad99 100644 (file)
@@ -134,4 +134,18 @@ public class EndpointPolicyUpdateRequest extends RpcMessage {
         this.name = EP_UPDATE_MESSAGE;
         this.method = EP_UPDATE_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 60571eb20b087b3a4172d009d404f8e54521e83e..928fa74948dc022b211a8676c082cee8350c2ccb 100644 (file)
@@ -95,5 +95,11 @@ public class EndpointPolicyUpdateResponse extends RpcMessage {
     public void setName(String name) {
         this.name = name;
     }
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
 
 }
index 55cc73e93df6b4d22cb14a6583c3502393f8b23c..885e1a96b3cf6b0f8b4298a364d8809213718ab0 100644 (file)
@@ -98,4 +98,18 @@ public class EndpointRequestRequest extends RpcMessage {
     public EndpointRequestRequest() {
         this.name = EP_REQUEST_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 0b329b3fb50563a4072c7b5b728049d464944273..aa92f3ae0aa13922d4b56f89c7f3235a85886fae 100644 (file)
@@ -166,4 +166,10 @@ public class EndpointRequestResponse extends RpcMessage {
     public EndpointRequestResponse() {
         this.name = REQUEST_MESSAGE_RESPONSE;
     }
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
 }
index dfd9161b79fbbed0ae8ac6831feace179fcc45c0..b2f1b9a477dcf6d81f9787ddb437bdf487775e42 100644 (file)
@@ -111,4 +111,18 @@ public class IdentityRequest extends RpcMessage {
         this.name = IDENTITY_MESSAGE;
         this.method = IDENTITY_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 7507ca3cc20048e6c07239234fa37dc19529b927..527a975780839ece8c0c122fcaa1f2b0c0d591ce 100644 (file)
@@ -163,5 +163,11 @@ public class IdentityResponse extends RpcMessage {
     public void setName(String name) {
         this.name = name;
     }
+    
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }    
 
 }
index b61685fc178f4429c6852c5cf2b895877427ca87..8080ac49b932c4430a568006bc800c0563483dbc 100644 (file)
@@ -113,4 +113,18 @@ public class PolicyResolutionRequest extends RpcMessage {
         this.name = RESOLVE_MESSAGE;
         this.method = RESOLVE_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 6c2e9f825d225aa303014e448bb785bf07e0b05c..ddba237d75cd74a22fb783592f3c6e07cb220fdf 100644 (file)
@@ -112,4 +112,11 @@ public class PolicyResolutionResponse extends RpcMessage {
         this.name = name;
     }
 
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
+
 }
index a2252d333083bbba466026b20b1310d30b55aef6..ea8704f90723c26cca0ff9bc2e5217b4eff20d25 100644 (file)
@@ -106,4 +106,18 @@ public class PolicyTriggerRequest extends RpcMessage {
         this.name = TRIGGER_MESSAGE;
         this.method = TRIGGER_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 3354a4c93c31ef875204164e99545e1b98dec5dd..f24eab6df207ad7e737377209d9d718e7bc907c9 100644 (file)
@@ -96,4 +96,11 @@ public class PolicyTriggerResponse extends RpcMessage {
         this.name = name;
     }
 
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
+
 }
index 45ff687585fd75ac50923884d7d23ad95105d6bc..d938372a3de55edb769e2977383d4d07687acbe0 100644 (file)
@@ -99,4 +99,18 @@ public class PolicyUpdateRequest extends RpcMessage {
         this.name = UPDATE_MESSAGE;
         this.method = UPDATE_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 4a1c7d6aa459fed677d79703a23f7ba566e66cae..8ae35657cae361641c25c4e1c8ad3c4fb5de9d00 100644 (file)
@@ -96,4 +96,11 @@ public class PolicyUpdateResponse extends RpcMessage {
         this.name = name;
     }
 
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
+
 }
index fdea27a275f2d65f8504ab58b82e8f5fa5d0264c..d888af15ab5557a4d0e618880385d12ae36dd7c7 100644 (file)
@@ -127,4 +127,18 @@ public class StateReportRequest extends RpcMessage {
         this.name = STATE_MESSAGE;
         this.method = STATE_MESSAGE;
     }
+    
+    /**
+     * Minimal check on validity of message
+     * @return true if message has passed validity check
+     */
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        if (params == null)
+            return false;
+        if (params.get(0) == null)
+            return false;
+        return true;
+    }
 }
index 8fd94c879cb17549106817574a1f8a1d50ff3146..d14f24132d2972a05efadc3c4f4935e25b4baa6d 100644 (file)
@@ -97,4 +97,11 @@ public class StateReportResponse extends RpcMessage {
         this.name = name;
     }
 
+    @JsonIgnore
+    @Override
+    public boolean valid() {
+        return true;
+    }
+    
+
 }
index 2224e11c3579d667d0b3fa220d2a8ba49d10ecb3..2a7afdcc1a8e42d1069c3dcf2805bcb647838c6b 100644 (file)
@@ -164,6 +164,11 @@ public class JsonRpcEndpointTest implements RpcBroker, RpcBroker.RpcCallback {
         public void setMethod(String method) {
             this.method = method;
         }
+        @JsonIgnore
+        @Override
+        public boolean valid() {
+            return true;
+        }
     }
 
     @Override
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityTest.java
new file mode 100644 (file)
index 0000000..e9bbafc
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class IdentityTest {
+    protected static final Logger logger = LoggerFactory.getLogger(IdentityTest.class);
+
+    Identity id;
+    
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        
+    }
+    
+    private static final String TEST_IP = "192.168.194.132";
+    private static final String TEST_MAC1 = "0x00:11:22:33:44:55";
+    private static final String TEST_MAC2 = "11:22:33:44:55:66";
+    private static final String TEST_MAC3 = "0xaa:0xBB:0xCC:0xdd:0xEE:0xFF";
+    private static final String TEST_MAC4 = "0x00-11-22-33-44-55";
+    private static final String TEST_MAC5 = "11-22-33-44-55-66";
+    private static final String TEST_MAC6 = "0xaa-0xBB-0xCC-0xdd-0xEE-0xFF";
+    private static final String TEST_MAC7 = "1:22:3:44:5:66";
+    private static final String TEST_MAC8 = "1-22-3-44-5-66";
+    private static final String TEST_CONTEXT = "foo";
+
+    @Test
+    public void testL3Identity() throws Exception {
+        id = new Identity(TEST_IP);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_IP));
+        assertTrue(id.getL3Context().getValue().equals(TEST_CONTEXT));
+        List<L3Address> lid = id.getL3Addresses();
+        assertTrue(lid.size() == 1);
+        for (L3Address l3addr : lid) {
+            assertTrue(l3addr.getIpAddress().equals(id.getL3Identity()));
+            assertTrue(l3addr.getL3Context().getValue().equals(TEST_CONTEXT));
+        }
+        //L2BridgeDomainId l2bdid = id.getL2Context();
+        //assertTrue(l2bdid.getValue().equals(TEST_CONTEXT));
+    }
+
+    @Test
+    public void testL2Identity() throws Exception {
+        id = new Identity(TEST_MAC1);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC1));
+        assertTrue(id.getL2Context().getValue().equals(TEST_CONTEXT));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC1));
+        
+        id = new Identity(TEST_MAC2);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC2));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC2));
+        
+        id = new Identity(TEST_MAC3);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC3));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC3));
+        
+        id = new Identity(TEST_MAC4);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC4));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC4));
+
+        id = new Identity(TEST_MAC5);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC5));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC5));
+
+        id = new Identity(TEST_MAC6);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC6));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC6));
+
+        id = new Identity(TEST_MAC7);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC7));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC7));
+
+        id = new Identity(TEST_MAC8);
+        id.setContext(TEST_CONTEXT);
+        assertTrue(id.identityAsString().equals(TEST_MAC8));
+        assertTrue(id.getL2Identity().getValue().equals(TEST_MAC8));
+    }
+    
+}
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContextTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L2EprContextTest.java
new file mode 100644 (file)
index 0000000..a2a72df
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2BridgeDomainId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+
+
+/**
+ *
+ */
+public class L2EprContextTest implements L2EprContext.Callback {
+    protected static final Logger logger = LoggerFactory.getLogger(L2EprContextTest.class);
+
+    private static final int TEST_SIZE = 1;
+    L2EprContext ctx = null;
+    private int callbacks;
+    @Mock
+    private DataBroker mockProvider;
+    private ScheduledExecutorService executor;    
+    @Mock
+    private JsonRpcEndpoint mockPeer;
+    @Mock
+    private WriteTransaction mockWriter;
+    @Mock
+    private ReadOnlyTransaction mockReader;
+    @Mock
+    private EndpointDeclarationRequest mockRequest;
+    @Mock
+    private Identity mockId;
+    @Mock
+    private L2BridgeDomainId mockL2Context;
+    @Mock
+    private MacAddress mockMac;
+    @Mock
+    private CheckedFuture<Optional<Endpoint>,ReadFailedException> mockFuture;
+    @Mock
+    private Optional<Endpoint> mockOption;
+    @Mock
+    private Endpoint mockEp;
+    
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        ctx =  new L2EprContext(mockPeer, mockRequest, 
+                TEST_SIZE,
+                mockProvider, executor);
+        ctx.setCallback(this);
+        
+    }
+
+    @Override
+    public void callback(L2EprContext ctx) {
+        callbacks++;
+    }
+
+    private static final String TEST_CONTEXT = "foo";
+    private static final String TEST_ID = "bar";
+    
+    @Test
+    public void testEpCreate() throws Exception {
+        when(mockId.getL2Context()).thenReturn(mockL2Context);
+        when(mockId.getL2Identity()).thenReturn(mockMac);
+        when(mockProvider.newWriteOnlyTransaction()).thenReturn(mockWriter);
+        
+        ctx.createL2Ep(TEST_CONTEXT, mockId);
+        verify(mockProvider).newWriteOnlyTransaction();
+        verify(mockWriter).submit();
+    }
+
+    @Test
+    public void testLookupEndpoint() throws Exception {
+        when(mockProvider.newReadOnlyTransaction()).thenReturn(mockReader);
+        when(mockReader.read(eq(LogicalDatastoreType.OPERATIONAL),
+                Matchers.<InstanceIdentifier<Endpoint>>any())).thenReturn(mockFuture);
+        ctx.lookupEndpoint(TEST_CONTEXT, TEST_ID);
+        verify(mockProvider).newReadOnlyTransaction();
+    }
+
+    @Test
+    public void testCallback() throws Exception {
+        callbacks = 0;
+        ctx.setCallback(this);
+        when(mockOption.get()).thenReturn(mockEp);
+        ctx.onSuccess(mockOption);
+        assertTrue(callbacks == 1);  
+    }
+
+}
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContextTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/L3EprContextTest.java
new file mode 100644 (file)
index 0000000..8e26f9b
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L3ContextId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+
+
+/**
+ *
+ */
+public class L3EprContextTest implements L3EprContext.Callback {
+    protected static final Logger logger = LoggerFactory.getLogger(L3EprContextTest.class);
+
+    private static final int TEST_SIZE = 1;
+    L3EprContext ctx = null;
+    private int callbacks;
+    @Mock
+    private DataBroker mockProvider;
+    private ScheduledExecutorService executor;    
+    @Mock
+    private JsonRpcEndpoint mockPeer;
+    @Mock
+    private WriteTransaction mockWriter;
+    @Mock
+    private ReadOnlyTransaction mockReader;
+    @Mock
+    private EndpointDeclarationRequest mockRequest;
+    @Mock
+    private Identity mockId;
+    @Mock
+    private L3ContextId mockL3Context;
+    @Mock
+    private IpAddress mockIp;
+    @Mock
+    private List<L3Address> mockAddresses;
+    @Mock
+    private CheckedFuture<Optional<Endpoint>,ReadFailedException> mockFuture;
+    @Mock
+    private Optional<EndpointL3> mockOption;
+    @Mock
+    private EndpointL3 mockEp;
+    private List<String> dummyList;
+    
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        ctx =  new L3EprContext(mockPeer, mockRequest, 
+                TEST_SIZE,
+                mockProvider, executor);
+        ctx.setCallback(this);
+        
+    }
+
+    @Override
+    public void callback(L3EprContext ctx) {
+        callbacks++;
+    }
+
+    private static final String TEST_CONTEXT = "foo";
+    private static final String TEST_ID = "bar";
+    private static final String TEST_IP1 = "192.168.194.131";
+    private static final String TEST_IP2 = "192.168.194.132";
+    private static final String TEST_IP3 = "192.168.194.133";
+    
+    @Test
+    public void testEpCreate() throws Exception {
+        when(mockId.getL3Context()).thenReturn(mockL3Context);
+        when(mockId.getL3Identity()).thenReturn(mockIp);
+        when(mockId.getL3Addresses()).thenReturn(mockAddresses);
+        when(mockProvider.newWriteOnlyTransaction()).thenReturn(mockWriter);
+        dummyList = new ArrayList<String>();
+        dummyList.add(TEST_IP1);
+        dummyList.add(TEST_IP2);
+        dummyList.add(TEST_IP3);
+        ctx.createL3Ep(TEST_CONTEXT, dummyList ,mockId);
+        verify(mockProvider).newWriteOnlyTransaction();
+        verify(mockWriter).submit();
+    }
+
+    @Test
+    public void testLookupEndpoint() throws Exception {
+        when(mockProvider.newReadOnlyTransaction()).thenReturn(mockReader);
+        when(mockReader.read(eq(LogicalDatastoreType.OPERATIONAL),
+                Matchers.<InstanceIdentifier<Endpoint>>any())).thenReturn(mockFuture);
+        ctx.lookupEndpoint(TEST_CONTEXT, TEST_ID);
+        verify(mockProvider).newReadOnlyTransaction();
+    }
+
+    @Test
+    public void testCallback() throws Exception {
+        callbacks = 0;
+        ctx.setCallback(this);
+        when(mockOption.get()).thenReturn(mockEp);
+        ctx.onSuccess(mockOption);
+        assertTrue(callbacks == 1);  
+    }
+
+}
index 5676374a1a02b8878fa3a05954b7aa33eb59c316..cf0e1feb5462983cea2cbd2b5676923014358aa8 100644 (file)
 
 package org.opendaylight.groupbasedpolicy.renderer.opflex;
 
+import static io.netty.buffer.Unpooled.copiedBuffer;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.util.CharsetUtil;
 
@@ -32,6 +36,8 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
 import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
@@ -51,18 +57,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 
-import static io.netty.buffer.Unpooled.*;
-
-import static org.junit.Assert.*;
-
-import static org.mockito.Mockito.*;
-
 /**
  *
  * Test the serialization and deserialization of RPC Messages,
  * and check against expected structure and values.
  */
-public class OpflexConnectionServiceTest {
+public class OpflexConnectionServiceTest implements RpcBroker.RpcCallback {
     protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
 
     static private final String TEST_EP_UUID = "85d53c32-47af-4eaf-82fd-ced653ff74da";
@@ -144,6 +144,11 @@ public class OpflexConnectionServiceTest {
         }
     }
 
+    @Override
+    public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
+        opflexService.callback(endpoint, request);
+    }
+    
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
@@ -251,7 +256,11 @@ public class OpflexConnectionServiceTest {
          */
         opflexService = new OpflexConnectionService();
         opflexService.setDataProvider(mockDataBroker);
-
+        List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
+        for (RpcMessage msg: messages) {
+            opflexService.subscribe(msg, this);
+        }
+        
         ObjectMapper objectMapper = new ObjectMapper();
         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         decoder = new JsonRpcDecoder(1000);