Added support for OpFlex domains. 53/9153/2
authorThomas Bachman <tbachman@yahoo.com>
Thu, 17 Jul 2014 07:14:22 +0000 (07:14 +0000)
committerThomas Bachman <tbachman@yahoo.com>
Fri, 18 Jul 2014 05:16:02 +0000 (05:16 +0000)
This adds domains to the scoping of OpFlex servers and
messages.

This patch also moves the OpFlex-specific messages to
their own package, and adds fixes for using concrete
typing for Futures.

Change-Id: I67a9cfeebb5d40490cfa02780cf7c9b3d2acfb74
Signed-off-by: Thomas Bachman <tbachman@yahoo.com>
36 files changed:
groupbasedpolicy/src/main/java/org/opendaylight/controller/config/yang/config/opflex_provider/impl/OpflexProviderModule.java
groupbasedpolicy/src/main/java/org/opendaylight/controller/config/yang/config/opflex_provider/impl/OpflexProviderModuleFactory.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/jsonrpc/ConnectionService.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/jsonrpc/JsonRpcEndpoint.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/jsonrpc/RpcServer.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/SwitchManager.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/flow/FlowTable.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexAgent.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexConnectionService.java
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomain.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServer.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Role.java [new file with mode: 0644]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointDeclarationRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointDeclarationRequest.java with 98% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointDeclarationResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointDeclarationResponse.java with 96% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointPolicyUpdateRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointPolicyUpdateRequest.java with 98% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointPolicyUpdateResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointPolicyUpdateResponse.java with 96% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointRequestRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointRequestRequest.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/EndpointRequestResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/EndpointRequestResponse.java with 98% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/IdentityRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityRequest.java with 90% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/IdentityResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityResponse.java with 98% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyResolutionRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyResolutionRequest.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyResolutionResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyResolutionResponse.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyTriggerRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyTriggerRequest.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyTriggerResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyTriggerResponse.java with 96% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyUpdateRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyUpdateRequest.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/PolicyUpdateResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/PolicyUpdateResponse.java with 96% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/StateReportRequest.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/StateReportRequest.java with 97% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/StateReportResponse.java [moved from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/StateReportResponse.java with 96% similarity]
groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/resolver/PolicyResolver.java
groupbasedpolicy/src/main/yang/renderer/opflex/opflex.yang
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexConnectionServiceTest.java
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomainTest.java [new file with mode: 0644]
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexMessageTest.java
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServerTest.java [new file with mode: 0644]
groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/RoleTest.java [new file with mode: 0644]

index 10a12419c834a31e2a52eb7deefec23f6f0ff47e..5ce3532970d113df099951135d13e28ccf830719 100644 (file)
@@ -30,7 +30,7 @@ public class OpflexProviderModule extends org.opendaylight.controller.config.yan
         final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration =
                 dataBrokerService
                 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
-                        OpflexConnectionService.DISCOVERY_DEFINITIONS_IID,
+                        OpflexConnectionService.DOMAINS_IID,
                         connectionService, DataChangeScope.SUBTREE );
 
         final class AutoCloseableConnectionService implements AutoCloseable {
index 1388cc4b95618745bfddb4f6566a7447580a06b9..621b5e9b196a477cd06022da12b455f8408b361c 100644 (file)
@@ -3,7 +3,7 @@
 *
 * Generated from: yang module name: opflex-provider-impl yang module local name: opflex-provider-impl
 * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Mon Jul 07 21:34:41 UTC 2014
+* Generated at: Wed Jul 16 08:30:30 UTC 2014
 *
 * Do not modify this file unless it is present under src/main directory
 */
index 3ced68a2c0f010ca1dbc09b6844812deec0eab60..c29755a7d0063692de57b9e8596d94e0ca151881 100644 (file)
@@ -10,7 +10,7 @@
 package org.opendaylight.groupbasedpolicy.jsonrpc;
 
 
-/*
+/**
  * An interface to provide notifications when connections are
  * established or closed. The connection notifications
  * use{@link RpcEncpoint} objects; as connections come and go,
index feb12ea9abb581a25a4f968c269ea6867cdddb12..1faef8a6019b79bf98168594c33592f117595c60 100644 (file)
@@ -14,7 +14,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -70,6 +69,7 @@ public class JsonRpcEndpoint implements ChannelFutureListener {
     }
 
     private String identifier;
+    private Object context;
     private ObjectMapper objectMapper;
     private Channel nettyChannel;
     private Map<String, CallContext> methodContext = Maps.newHashMap();
@@ -85,6 +85,14 @@ public class JsonRpcEndpoint implements ChannelFutureListener {
         this.identifier = identifier;
     }
 
+    public Object getContext() {
+        return context;
+    }
+
+    public void setContext(Object context) {
+        this.context = context;
+    }
+
     public ConnectionService getConnectionService() {
         return connectionService;
     }
@@ -96,10 +104,6 @@ public class JsonRpcEndpoint implements ChannelFutureListener {
         return nettyChannel;
     }
 
-    public boolean supportsMessages(List<RpcMessage> messages) {
-        return messageMap.containsMessages(messages);
-    }
-
     public JsonRpcEndpoint(String identifier, ConnectionService connectionService,
             ObjectMapper objectMapper, Channel channel,
             RpcMessageMap messageMap, RpcBroker broker) {
index d903a8e40d8e442b981a5d618dec12d16a51da11..853512fba0e499c343d7ce19473fdbe07c86e837 100644 (file)
@@ -51,6 +51,7 @@ public class RpcServer {
     String identity;
     int listenPort;
     Channel channel;
+    Object context;
     RpcMessageMap messageMap;
     ConnectionService connectionService;
     RpcBroker broker;
@@ -65,6 +66,14 @@ public class RpcServer {
         this.identity = identity;
     }
 
+    public Object getContext() {
+        return context;
+    }
+
+    public void setContext(Object context) {
+        this.context = context;
+    }
+
     public void addMessage(RpcMessage message) {
         this.messageMap.add(message);
     }
@@ -98,6 +107,7 @@ public class RpcServer {
 
         JsonRpcEndpoint endpoint = new JsonRpcEndpoint(identifier, connectionService,
                 objectMapper, channel, messageMap, broker);
+        endpoint.setContext(context);
         JsonRpcServiceBinderHandler binderHandler =
                 new JsonRpcServiceBinderHandler(endpoint);
         channel.pipeline().addLast(binderHandler);
index bec0b000c8f511d7305cf42e2338ef0d3e0ab253..821832563236872ad44b9bba8024cbb169588fc0 100644 (file)
@@ -36,7 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * @author readams
  */
 public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
-    private static final Logger LOG = 
+    private static final Logger LOG =
             LoggerFactory.getLogger(OFOverlayRenderer.class);
 
     private final DataBroker dataBroker;
@@ -44,12 +44,12 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
     private final SwitchManager switchManager;
     private final EndpointManager endpointManager;
     private final PolicyManager policyManager;
-    
+
     private final ScheduledExecutorService executor;
 
-    private static final InstanceIdentifier<OfOverlayConfig> configIid = 
+    private static final InstanceIdentifier<OfOverlayConfig> configIid =
             InstanceIdentifier.builder(OfOverlayConfig.class).build();
-    
+
     private OfOverlayConfig config;
     ListenerRegistration<DataChangeListener> configReg;
 
@@ -60,23 +60,23 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
 
         int numCPU = Runtime.getRuntime().availableProcessors();
         executor = Executors.newScheduledThreadPool(numCPU * 2);
-        
+
         policyResolver = new PolicyResolver(dataProvider, executor);
         switchManager = new SwitchManager(dataProvider, executor);
-        endpointManager = new EndpointManager(dataProvider, rpcRegistry, 
+        endpointManager = new EndpointManager(dataProvider, rpcRegistry,
                                               executor, switchManager);
-        
+
         policyManager = new PolicyManager(dataProvider,
-                                          policyResolver, 
+                                          policyResolver,
                                           switchManager,
                                           endpointManager,
                                           rpcRegistry,
                                           executor);
-        
-        configReg = 
-                dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, 
-                                                        configIid, 
-                                                        this, 
+
+        configReg =
+                dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+                                                        configIid,
+                                                        this,
                                                         DataChangeScope.SUBTREE);
         readConfig();
         LOG.info("Initialized OFOverlay renderer");
@@ -95,13 +95,13 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
         if (switchManager != null) switchManager.close();
         if (endpointManager != null) endpointManager.close();
     }
-    
+
     // ******************
     // DataChangeListener
     // ******************
-    
+
     @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, 
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
                                                    DataObject> change) {
         readConfig();
     }
@@ -109,14 +109,14 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
     // **************
     // Implementation
     // **************
-    
+
     private void readConfig() {
-        ListenableFuture<Optional<DataObject>> dao = 
+        ListenableFuture<Optional<OfOverlayConfig>> dao =
                 dataBroker.newReadOnlyTransaction()
                     .read(LogicalDatastoreType.CONFIGURATION, configIid);
-        Futures.addCallback(dao, new FutureCallback<Optional<DataObject>>() {
+        Futures.addCallback(dao, new FutureCallback<Optional<OfOverlayConfig>>() {
             @Override
-            public void onSuccess(final Optional<DataObject> result) {
+            public void onSuccess(final Optional<OfOverlayConfig> result) {
                 if (!result.isPresent()) return;
                 if (result.get() instanceof OfOverlayConfig) {
                     config = (OfOverlayConfig)result.get();
@@ -130,7 +130,7 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
             }
         }, executor);
     }
-    
+
     private void applyConfig() {
         switchManager.setEncapsulationFormat(config.getEncapsulationFormat());
         endpointManager.setLearningMode(config.getLearningMode());
index 9bd035f2c3739e597a2a2ffabfc26a13c25aa22c..1f1fed215331b7f933692052a4cfbd0ac218e281 100644 (file)
@@ -39,12 +39,12 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 /**
- * Manage connected switches and ensure their configuration is set up 
+ * Manage connected switches and ensure their configuration is set up
  * correctly
  * @author readams
  */
 public class SwitchManager implements AutoCloseable, DataChangeListener {
-    private static final Logger LOG = 
+    private static final Logger LOG =
             LoggerFactory.getLogger(SwitchManager.class);
 
     private final DataBroker dataProvider;
@@ -56,7 +56,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
                 .child(Node.class).build();
     private ListenerRegistration<DataChangeListener> nodesReg;
 
-    private ConcurrentHashMap<NodeId, SwitchState> switches = 
+    private ConcurrentHashMap<NodeId, SwitchState> switches =
             new ConcurrentHashMap<>();
     private List<SwitchListener> listeners = new CopyOnWriteArrayList<>();
 
@@ -65,8 +65,8 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
         super();
         this.dataProvider = dataProvider;
         nodesReg = dataProvider
-                .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, 
-                                            nodeIid, this, 
+                .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                                            nodeIid, this,
                                             DataChangeScope.ONE);
         readSwitches();
         LOG.debug("Initialized OFOverlay switch manager");
@@ -75,22 +75,22 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
     // *************
     // SwitchManager
     // *************
-    
+
     /**
      * Get the collection of switches that are in the "ready" state.  Note
      * that the collection may be concurrently modified
      * @return A {@link Collection} containing the switches that are ready.
      */
     public Collection<NodeId> getReadySwitches() {
-        Collection<SwitchState> ready = 
-                Collections2.filter(switches.values(), 
+        Collection<SwitchState> ready =
+                Collections2.filter(switches.values(),
                             new Predicate<SwitchState>() {
                     @Override
                     public boolean apply(SwitchState input) {
-                        return SwitchStatus.READY.equals(input.status); 
+                        return SwitchStatus.READY.equals(input.status);
                     }
                 });
-        return Collections2.transform(ready, 
+        return Collections2.transform(ready,
                                       new Function<SwitchState, NodeId>() {
             @Override
             public NodeId apply(SwitchState input) {
@@ -109,7 +109,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
         if (state == null) return false;
         return SwitchStatus.READY.equals(state.status);
     }
-    
+
     /**
      * Add a {@link SwitchListener} to get notifications of switch events
      * @param listener the {@link SwitchListener} to add
@@ -140,7 +140,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
     // ******************
 
     @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, 
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
                                                    DataObject> change) {
         for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
             LOG.info("{} removed", iid);
@@ -158,11 +158,11 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
             updateSwitch(dao);
         }
     }
-    
+
     // **************
     // Implementation
     // **************
-    
+
     private void updateSwitch(DataObject dao) {
         if (!(dao instanceof Node)) return;
         // Switches are registered as Nodes in the inventory; OpenFlow switches
@@ -172,24 +172,24 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
         if (fcn == null) return;
 
         LOG.debug("{} update", node.getId());
-        
-        SwitchState state = switches.get(node.getId()); 
+
+        SwitchState state = switches.get(node.getId());
         if (state == null) {
             state = new SwitchState(node);
-            SwitchState old = 
+            SwitchState old =
                     switches.putIfAbsent(node.getId(), state);
             if (old == null) {
                 switchConnected(node.getId());
             }
         }
     }
-    
-    // XXX there's a race condition here if a switch exists at startup and is 
+
+    // XXX there's a race condition here if a switch exists at startup and is
     // removed very quickly.
-    private final FutureCallback<Optional<DataObject>> readSwitchesCallback =
-            new FutureCallback<Optional<DataObject>>() {
+    private final FutureCallback<Optional<Nodes>> readSwitchesCallback =
+            new FutureCallback<Optional<Nodes>>() {
         @Override
-        public void onSuccess(Optional<DataObject> result) {
+        public void onSuccess(Optional<Nodes> result) {
             if (result.isPresent() && result.get() instanceof Nodes) {
                 Nodes nodes = (Nodes)result.get();
                 for (Node node : nodes.getNode()) {
@@ -203,22 +203,22 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
             LOG.error("Count not read switch information", t);
         }
     };
-    
+
     /**
      * Read the set of switches from the ODL inventory and update our internal
      * map.
-     * 
-     * <p>This is safe only if there can only be one notification at a time, 
-     * as there are race conditions in the face of concurrent data change 
+     *
+     * <p>This is safe only if there can only be one notification at a time,
+     * as there are race conditions in the face of concurrent data change
      * notifications
      */
     private void readSwitches() {
-        ListenableFuture<Optional<DataObject>> future = 
+        ListenableFuture<Optional<Nodes>> future =
                 dataProvider.newReadOnlyTransaction()
                     .read(LogicalDatastoreType.OPERATIONAL,nodesIid);
         Futures.addCallback(future, readSwitchesCallback);
     }
-    
+
     /**
      * Set the ready state of the node to PREPARING and begin the initialization
      * process
@@ -232,7 +232,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
             LOG.info("New switch {} connected", nodeId);
         }
     }
-    
+
     /**
      * Set the ready state of the node to READY and notify listeners
      */
@@ -257,7 +257,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
         }
         LOG.info("Switch {} removed", nodeId);
     }
-    
+
     private enum SwitchStatus {
         /**
          * The switch is connected but not yet configured
@@ -268,7 +268,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
          */
         READY
     }
-    
+
     /**
      * Internal representation of the state of a connected switch
      */
@@ -279,7 +279,7 @@ public class SwitchManager implements AutoCloseable, DataChangeListener {
             super();
             this.switchNode = switchNode;
         }
-        
+
     }
 
 }
index cf77368ebc613ee0e69bdd152cc38ce2e603277f..41e1e7ef7ecbc6b176291cdb82c882fa1d129cfb 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
@@ -44,7 +43,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * @author readams
  */
 public abstract class FlowTable {
-    protected static final Logger LOG = 
+    protected static final Logger LOG =
             LoggerFactory.getLogger(FlowTable.class);
 
     /**
@@ -53,13 +52,13 @@ public abstract class FlowTable {
     public static class FlowTableCtx {
         protected final DataBroker dataBroker;
         protected final RpcProviderRegistry rpcRegistry;
-        
+
         protected final PolicyManager policyManager;
         protected final SwitchManager switchManager;
         protected final EndpointManager endpointManager;
-        
+
         protected final PolicyResolver policyResolver;
-        
+
         protected final ScheduledExecutorService executor;
 
         public FlowTableCtx(DataBroker dataBroker,
@@ -78,9 +77,9 @@ public abstract class FlowTable {
             this.policyResolver = policyResolver;
             this.executor = executor;
         }
-        
+
     }
-    
+
     protected final FlowTableCtx ctx;
 
     public FlowTable(FlowTableCtx ctx) {
@@ -91,18 +90,18 @@ public abstract class FlowTable {
     // *********
     // FlowTable
     // *********
-    
+
     /**
-     * Update the relevant flow table for the node 
+     * Update the relevant flow table for the node
      * @param nodeId the node to update
      * @param dirty the dirty set
-     * @throws Exception 
+     * @throws Exception
      */
     public void update(NodeId nodeId, Dirty dirty) throws Exception {
         ReadWriteTransaction t = ctx.dataBroker.newReadWriteTransaction();
-        InstanceIdentifier<Table> tiid = 
+        InstanceIdentifier<Table> tiid =
                 FlowUtils.createTablePath(nodeId, getTableId());
-        Optional<DataObject> r = 
+        Optional<Table> r =
                 t.read(LogicalDatastoreType.CONFIGURATION, tiid).get();
 
         HashMap<String, FlowCtx> flowMap = new HashMap<>();
@@ -125,25 +124,25 @@ public abstract class FlowTable {
                          FlowUtils.createFlowPath(tiid, fx.f.getKey()));
             }
         }
-        
+
         ListenableFuture<RpcResult<TransactionStatus>> result = t.commit();
         Futures.addCallback(result, updateCallback);
     }
 
     /**
      * Sync flow state using the flow map
-     * @throws Exception 
+     * @throws Exception
      */
     public abstract void sync(ReadWriteTransaction t,
                               InstanceIdentifier<Table> tiid,
                               Map<String, FlowCtx> flowMap,
                               NodeId nodeId, Dirty dirty) throws Exception;
-    
+
     /**
      * Get the table ID being manipulated
      */
     public abstract short getTableId();
-    
+
     // ***************
     // Utility methods
     // ***************
@@ -157,8 +156,8 @@ public abstract class FlowTable {
             .setBarrier(false)
             .setHardTimeout(0)
             .setIdleTimeout(0);
-    }    
-    
+    }
+
     /**
      * Generic callback for handling result of flow manipulation
      * @author readams
@@ -176,7 +175,7 @@ public abstract class FlowTable {
 
         @Override
         public void onFailure(Throwable t) {
-            LOG.error("Failed to add flow entry", t);            
+            LOG.error("Failed to add flow entry", t);
         }
     }
     protected static final FlowCallback<TransactionStatus> updateCallback =
@@ -189,7 +188,7 @@ public abstract class FlowTable {
      * @param flowId the ID for the flow
      * @return <code>true</code> if the flow needs to be added
      */
-    protected static boolean visit(Map<String, FlowCtx> flowMap, 
+    protected static boolean visit(Map<String, FlowCtx> flowMap,
                                    String flowId) {
         FlowCtx c = flowMap.get(flowId);
         if (c != null) {
@@ -198,7 +197,7 @@ public abstract class FlowTable {
         }
         return true;
     }
-    
+
     /**
      * Write the given flow to the transaction
      */
@@ -206,11 +205,11 @@ public abstract class FlowTable {
                                     InstanceIdentifier<Table> tiid,
                                     Flow flow) {
         LOG.trace("{} {}", flow.getId(), flow);
-        t.put(LogicalDatastoreType.CONFIGURATION, 
-              FlowUtils.createFlowPath(tiid, flow.getId()), 
+        t.put(LogicalDatastoreType.CONFIGURATION,
+              FlowUtils.createFlowPath(tiid, flow.getId()),
               flow);
     }
-    
+
     /**
      * Context object for keeping track of flow state
      */
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexAgent.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexAgent.java
new file mode 100644 (file)
index 0000000..18e8cc2
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+
+public class OpflexAgent {
+    String identity;
+    String domain;
+    List<Role> roles;
+    JsonRpcEndpoint endpoint;
+    OpflexRpcServer opflexServer;
+
+    public OpflexAgent() {
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+
+    public String getIdentity() {
+        return identity;
+    }
+
+    public void setIdentity(String identity) {
+        this.identity = identity;
+    }
+
+    public OpflexRpcServer getOpflexServer() {
+        return opflexServer;
+    }
+
+    public void setOpflexServer(OpflexRpcServer server) {
+        this.opflexServer = server;
+    }
+
+    public List<Role> getRoles() {
+        return roles;
+    }
+
+    public void setRoles(List<Role> roles) {
+        this.roles = roles;
+    }
+
+    public JsonRpcEndpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public void setEndpoint(JsonRpcEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+}
index d16fac15297da53c25e3f6c2da3e60b6757581e7..e3d6afe09927542f1a48ab360906e83bed263c16 100644 (file)
@@ -14,15 +14,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
@@ -30,19 +31,28 @@ import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.Domains;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DomainsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.Domain;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.DomainKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepository;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-/*
+/**
  * Manages the different OpFlex entity connections. It does this
  * on behalf of each logical OpFlex entity:
  *    o  Policy Repositories
@@ -54,9 +64,9 @@ import com.google.common.util.concurrent.ListenableFuture;
  * Servers and connections are maintained in dedicated client and
  * server maps.
  *
- * TODO: calls to add messages to policy repository, EP registry , and observer
- * TODO: incorporate OpFlex domain
- * TODO: break into smaller pieces?
+ * @author tbachman
+ *
+ * TODO: Still too big - need to separate
  */
 public class OpflexConnectionService
     implements ConnectionService, RpcBroker,
@@ -64,112 +74,9 @@ public class OpflexConnectionService
     protected static final Logger logger =
             LoggerFactory.getLogger(OpflexConnectionService.class);
 
-    public enum Role {
-        POLICY_REPOSITORY("policy_repository"),
-        ENDPOINT_REGISTRY("endpoint_registry"),
-        OBSERVER("observer"),
-        POLICY_ELEMENT("policy_element");
-
-        private String role;
-        Role(String role) {
-            this.role = role;
-        }
-        @Override
-        public String toString() {
-            return this.role;
-        }
-    }
-
-    private static class OpflexConnection {
-        String identity;
-        List<Role> roles;
-        JsonRpcEndpoint endpoint;
-
-        public OpflexConnection() {
-        }
-
-        public String getIdentity() {
-            return identity;
-        }
-
-        public void setIdentity(String identity) {
-            this.identity = identity;
-        }
-
-        public List<Role> getRoles() {
-            return roles;
-        }
-
-        public void setRoles(List<Role> roles) {
-            this.roles = roles;
-        }
-
-        public JsonRpcEndpoint getEndpoint() {
-            return endpoint;
-        }
-
-        public void setEndpoint(JsonRpcEndpoint endpoint) {
-            this.endpoint = endpoint;
-        }
-
-    }
-
-    public static class OpflexRpcServer {
-        private String identity;
-        private List<Role> roles;
-        private RpcServer server;
-
-        public OpflexRpcServer() {
-            roles = new ArrayList<Role>();
-        }
-
-        public OpflexRpcServer(String identity) {
-            this.identity = identity;
-        }
-
-        public OpflexRpcServer(String identity, List<Role> roles) {
-            this.identity = identity;
-            this.roles = roles;
-        }
-
-        public String getId() {
-            return this.identity;
-        }
-
-        public void setRpcServer(RpcServer server) {
-            this.server = server;
-        }
-
-        public RpcServer getRpcServer() {
-            return this.server;
-        }
-
-        public void addRole(Role role) {
-            if (!this.roles.contains(role))
-                this.roles.add(role);
-        }
-
-        public List<Role> getRoles() {
-            return this.roles;
-        }
-
-        public boolean sameServer(OpflexRpcServer srv) {
-            if (this == srv)
-                return true;
-            if (srv == null)
-                return false;
-            if (!this.identity.equals(srv.identity))
-                return false;
-            if (this.roles == null && srv.roles == null)
-                return true;
-            if (this.roles == null || srv.roles == null)
-                return false;
-            if (this.roles.size() == srv.roles.size() && this.roles.containsAll(srv.roles))
-                return true;
-            return false;
-        }
-    }
 
+    static final String OPFLEX_DOMAIN = "default";
+    static final String INVALID_DOMAIN = "Domain mismatch";
     // Properties that can be set in config.ini
     static final String OPFLEX_LISTENPORT = "opflex.listenPort";
     private static final Integer defaultOpflexPort = 6670;
@@ -179,17 +86,26 @@ public class OpflexConnectionService
     private Integer opflexListenPort = defaultOpflexPort;
     private String opflexListenIp = defaultOpflexIp;
 
-    ConcurrentMap<String, OpflexConnection> opflexAgents = null;
-    ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
+    private final ScheduledExecutorService executor;
+
+    List<Domain> domainList = null;
+    ConcurrentMap<String, OpflexDomain> opflexDomains = null;
     ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
-    List<RpcMessage> policyRepositoryMessages;
-    List<RpcMessage> endpointRegistryMessages;
-    List<RpcMessage> observerMessages;
+
     private DataBroker dataProvider;
 
-    public static final InstanceIdentifier<DiscoveryDefinitions>  DISCOVERY_DEFINITIONS_IID =
-            InstanceIdentifier.builder(DiscoveryDefinitions.class).build();
+    public static final InstanceIdentifier<Domains> DOMAINS_IID =
+            InstanceIdentifier.builder(Domains.class).build();
 
+    public InstanceIdentifier<Domain> domainIid(DomainKey domainKey) {
+        return InstanceIdentifier.builder(Domains.class).child(Domain.class, domainKey)
+                .build();
+    }
+
+    public OpflexConnectionService() {
+        int numCPU = Runtime.getRuntime().availableProcessors();
+        executor = Executors.newScheduledThreadPool(numCPU * 2);
+    }
 
     /**
      *
@@ -201,27 +117,10 @@ public class OpflexConnectionService
     public void setDataProvider(DataBroker salDataProvider) {
         dataProvider = salDataProvider;
 
-        startOpflexManager();
-    }
-
-    private DiscoveryDefinitions getDiscoveryDefinitions() {
-
-        ReadTransaction t = dataProvider.newReadOnlyTransaction();
-        ListenableFuture<Optional<DataObject>> f = t.read(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
-        try {
-            Optional<DataObject> dao = f.get();
-            if (dao.get() != null && dao.get() instanceof DiscoveryDefinitions) {
-                return (DiscoveryDefinitions)dao.get();
-            }
-        }
-        catch ( Exception e ) {
-            logger.warn("Not sure what happens here");
-        }
-        return null;
-
+        start();
     }
 
-    private List<OpflexRpcServer> setDefaultIdentities() {
+    private List<OpflexRpcServer> setDefaultIdentities(OpflexDomain domain) {
 
         /*
          * Create a single server, filling all roles
@@ -232,39 +131,82 @@ public class OpflexConnectionService
         roles.add(Role.POLICY_REPOSITORY);
         roles.add(Role.ENDPOINT_REGISTRY);
         roles.add(Role.OBSERVER);
-        OpflexRpcServer srv = new OpflexRpcServer(identity, roles);
+
+        OpflexDomain od = new OpflexDomain();
+        od.setDomain(OPFLEX_DOMAIN);
+        OpflexRpcServer srv = new OpflexRpcServer(od, identity, roles);
+        srv.setConnectionService(this);
+        srv.setRpcBroker(this);
         srvList.add(srv);
         return srvList;
 
     }
 
-    private List<OpflexRpcServer> createServerList() {
-        DiscoveryDefinitions identities = getDiscoveryDefinitions();
+    private List<OpflexRpcServer> createServerList(OpflexDomain d, Domain domain) {
+
+        DiscoveryDefinitions identities = domain.getDiscoveryDefinitions();
         if (identities != null) {
             Map<String, OpflexRpcServer> servers =
                     new ConcurrentHashMap<String, OpflexRpcServer>();
             List<String> addList = getPolicyRepositories(identities.getPolicyRepository());
-            addServerList(servers, addList, Role.POLICY_REPOSITORY);
+            addServerList(d, servers, addList, Role.POLICY_REPOSITORY);
             addList = getEndpointRegistries(identities.getEndpointRegistry());
-            addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
+            addServerList(d, servers, addList, Role.ENDPOINT_REGISTRY);
             addList = getObservers(identities.getObserver());
-            addServerList(servers, addList, Role.OBSERVER);
+            addServerList(d, servers, addList, Role.OBSERVER);
             return(new ArrayList<OpflexRpcServer>(servers.values()));
         }
-        else {
-            return setDefaultIdentities();
-        }
+        return null;
+    }
+
+    private void initializeConfig() {
+        // XXX - This is a hack to avoid a bug in the data broker
+        // API where you have to write all the parents before you can write
+        // a child
+        WriteTransaction t = dataProvider.newWriteOnlyTransaction();
+        t.put(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID, new DomainsBuilder().build());
+        ListenableFuture<RpcResult<TransactionStatus>> f = t.commit();
+        Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
+
+            @Override
+            public void onSuccess(RpcResult<TransactionStatus> result) {
+
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.error("Could not write domain base container", t);
+            }
+        });
     }
 
     private void initializeServers() {
 
+        OpflexDomain od;
+
+        //initializeConfig();
+        readConfig();
         /*
          * Get the configured identities, if any. If lists are empty,
          * set up a single instance of each, using the localhost
-         * interface
+         * interface, all inside a default domain
          */
-        List<OpflexRpcServer> serverList = createServerList();
-        addServers(serverList);
+        if (domainList != null && domainList.size() > 0) {
+            for (Domain d : domainList) {
+                od = opflexDomains.get(d.getId());
+                if (od == null) continue;
+                List<OpflexRpcServer> serverList = createServerList(od, d);
+                od.addServers(serverList);
+            }
+        }
+        else {
+            // TODO: should also write into config store?
+            logger.warn("Setting default identities");
+            od = new OpflexDomain();
+            od.setDomain(OPFLEX_DOMAIN);
+            od.addServers(setDefaultIdentities(od));
+            opflexDomains.put(od.getDomain(), od);
+        }
     }
 
 
@@ -295,7 +237,7 @@ public class OpflexConnectionService
         return identityList;
     }
 
-    private void addServerList( Map<String, OpflexRpcServer> servers,
+    private void addServerList( OpflexDomain d, Map<String, OpflexRpcServer> servers,
             List<String> idList, Role role ) {
         if (idList == null || idList.size() <= 0)
             return;
@@ -309,109 +251,77 @@ public class OpflexConnectionService
             }
 
             roles.add(role);
-            srv = new OpflexRpcServer(id, roles);
+            srv = new OpflexRpcServer(d, id, roles);
+            srv.setConnectionService(this);
+            srv.setRpcBroker(this);
             servers.put(id, srv);
         }
 
     }
 
-    private void launchRpcServer(OpflexRpcServer srv) {
-        RpcServer rpcSrv = new RpcServer(srv.getId().split(":")[0],
-                Integer.parseInt(srv.getId().split(":")[1]));
-        rpcSrv.setConnectionService(this);
-        rpcSrv.setRpcBroker(this);
 
-        /*
-         * Make sure the server is configured for the proper messages
-         */
-        List<Role> roles = srv.getRoles();
-        for ( Role role : roles ) {
-            switch (role) {
-                case POLICY_REPOSITORY:
-                {
-                    rpcSrv.addMessageList(this.policyRepositoryMessages);
-                }
-                break;
-                case ENDPOINT_REGISTRY:
-                {
-                    rpcSrv.addMessageList(this.endpointRegistryMessages);
-                }
-                break;
-                case OBSERVER:
-                {
-                    rpcSrv.addMessageList(this.observerMessages);
-                }
-                break;
-                default:
-                {
-                    logger.warn("Invalid Role {}", role );
-                }
-                break;
-            }
-        }
 
-        srv.setRpcServer(rpcSrv);
-        opflexServers.put(srv.getId(), srv);
+    /**
+     * We store the {@link OpflexDomain} in the {@link JsonRpcEndpoint}'s
+     * context field when the {@link RpcServer} creates the new connection.
+     *
+     * @param endpoint The endpoint to look up
+     * @return The OpflexDomain that owns this endpoint
+     *
+     * TODO: should throw an exception of there is no
+     * OpflexDomain that contains this endpoint
+     */
+    public OpflexDomain getOpflexDomain(JsonRpcEndpoint endpoint) {
+        if (endpoint.getContext() instanceof OpflexRpcServer) {
+            OpflexRpcServer srv = (OpflexRpcServer)endpoint.getContext();
+            return srv.getDomain();
+        }
+        logger.warn("endpoint {} does not have a domain", endpoint.getIdentifier());
+        return null;
+    }
 
-        new Thread() {
-            private RpcServer server;
+    /**
+     * Find the {@link OpflexAgent} that owns this
+     * {@link JsonRpcEndpoint}.
+     *
+     * @param endpoint The endpoint to look up
+     * @return The OpflexConnection that owns this endpoint
+     *
+     * TODO: should throw an exception of there is no
+     * OpflexConnection that contains this endpoint
+     */
+    public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
 
-            public Thread initializeServerParams(RpcServer server) {
-                this.server = server;
-                return this;
-            }
-            @Override
-            public void run() {
-                try {
-                    server.start();
-                } catch (Exception e) {
-                    logger.warn("Exception starting new server {}", e);
-                }
-            }
-        }.initializeServerParams(rpcSrv).start();
+        OpflexDomain od = getOpflexDomain(endpoint);
+        if (od != null) {
+            return od.getOpflexAgent(endpoint.getIdentifier());
+        }
+        logger.warn("Couldn't find OpflexConnection for {}", endpoint.getIdentifier());
+        return null;
 
     }
 
-    private void addServers(List<OpflexRpcServer> idMap) {
-        /*
-         * Check to see if there's already a server
-         * with this identity, and if so, close it
-         * and replace it with this one.
-         */
-        for ( OpflexRpcServer srv: idMap ) {
-            OpflexRpcServer server = opflexServers.get(srv.getId());
-            if (server != null) {
-                if ( !server.sameServer(srv)) {
-                    OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
-                    oldServer.getRpcServer().getChannel().disconnect();
-                    launchRpcServer(srv);
-                }
-            }
-            else {
-                launchRpcServer(srv);
-            }
+    /**
+     * Get the OpflexRpcServer that spawned this endpoint.
+     *
+     * @param endpoint The endpoint to look up
+     * @return The OpflexRpcServer that owns this endpoint, or
+     * null if the server no longer exists
+     *
+     * TODO: exception if the endpoint is owned by anything
+     */
+    public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
+        if (endpoint.getContext() instanceof OpflexRpcServer) {
+            return (OpflexRpcServer)endpoint.getContext();
         }
+        logger.warn("Couldn't find OpflexConnection for endpoint {}",
+                endpoint.getIdentifier());
+        return null;
     }
 
-    private void dropServers(List<String> oldServers) {
-        OpflexRpcServer server;
 
-        /*
-         * Check to see if there's already a server
-         * with this identity, and if so, close it
-         * and replace it with this one.
-         */
-        for (String identity: oldServers) {
-            if (opflexServers.containsKey(identity)) {
-                server = opflexServers.remove(identity);
-                server.getRpcServer().getChannel().disconnect();
-            }
-        }
-    }
-
-    public void startOpflexManager() {
-        opflexAgents = new ConcurrentHashMap<String, OpflexConnection>();
-        opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
+    public void start() {
+        opflexDomains = new ConcurrentHashMap<String, OpflexDomain>();
         brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
 
         /*
@@ -434,22 +344,8 @@ public class OpflexConnectionService
          * Set up the messages supported by each OpFlex policy
          * component
          */
-        policyRepositoryMessages = new ArrayList<RpcMessage>();
-        endpointRegistryMessages = new ArrayList<RpcMessage>();
-        observerMessages = new ArrayList<RpcMessage>();
-
-        IdentityRequest idRequest = new IdentityRequest();
-        policyRepositoryMessages.add(idRequest);
-        endpointRegistryMessages.add(idRequest);
-        observerMessages.add(idRequest);
-
         /* this class implements identity handlers */
-        subscribe(idRequest, this);
-
-        IdentityResponse idResponse = new IdentityResponse();
-        policyRepositoryMessages.add(idResponse);
-        endpointRegistryMessages.add(idResponse);
-        observerMessages.add(idResponse);
+        subscribe(new IdentityRequest(), this);
 
         initializeServers();
     }
@@ -459,84 +355,139 @@ public class OpflexConnectionService
      * connections and servers.
      */
     public void stopping() {
-        for (OpflexConnection connection : opflexAgents.values()) {
-            connection.getEndpoint().getChannel().disconnect();
-        }
-        for (OpflexRpcServer server : opflexServers.values() ) {
-            if (server.getRpcServer().getChannel() != null) {
-                server.getRpcServer().getChannel().disconnect();
-            }
+        for (OpflexDomain d : opflexDomains.values()) {
+            d.cleanup();
         }
     }
 
-    /**
-     * Remove the OpFlex connection/agent from the map
-     *
-     * @param identifier The identity of the connection that was closed
-     */
-    public void removeConnection(String identifier) {
-        opflexAgents.remove(identifier);
-    }
 
-    /**
-     * Add a server with the given identity
-     *
-     * @param identity The IP address/socket pair for the server
-     * @param server The instantiated server
-     */
-    public void addServer(String identity, OpflexRpcServer server) {
-        opflexServers.put(identity, server);
+    private void deleteDomain(String domain) {
+
+        OpflexDomain od = opflexDomains.remove(domain);
+        if (od != null) {
+            od.cleanup();
+        }
     }
 
     /**
-     * Implemented from the AutoCloseable interface.
+     * Close the connection service. Implemented from the
+     * AutoCloseable interface.
      */
      @Override
      public void close() throws ExecutionException, InterruptedException {
 
+         executor.shutdownNow();
+
          if (dataProvider != null) {
              WriteTransaction t = dataProvider.newWriteOnlyTransaction();
-             t.delete(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
+             t.delete(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID);
              t.commit().get();
          }
      }
 
-    @Override
-    public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject>change) {
-
-        List<String> addList = new ArrayList<String>();
-        List <String> dropList = new ArrayList<String>();
-
-        /* Get the new list of configured servers */
-        List<OpflexRpcServer> serverList = createServerList();
+     private void readConfig() {
+         ListenableFuture<Optional<Domains>> dao =
+                 dataProvider.newReadOnlyTransaction()
+                     .read(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID);
+         Futures.addCallback(dao, new FutureCallback<Optional<Domains>>() {
+             @Override
+             public void onSuccess(final Optional<Domains> result) {
+                 if (!result.isPresent()) {
+                     logger.warn("No result!!!");
+                     return;
+                 }
+                 getNewConfig(result);
+             }
+
+             @Override
+             public void onFailure(Throwable t) {
+                 logger.error("Failed to read configuration", t);
+             }
+         }, executor);
+     }
 
-        /*
-         * Create a list of new servers by skipping any servers in the
-         * list that are already configured (i.e. same IP/socket and set
-         * of roles) -- no need to take them down
-         */
-        for ( OpflexRpcServer srv : serverList ) {
-            OpflexRpcServer s = opflexServers.get(srv.getId());
-            if (s != null && s.getRoles().containsAll(srv.getRoles())) {
-                continue;
-            }
-            addList.add(srv.getId());
-        }
+     private void getNewConfig(final Optional<Domains> result) {
+
+         List<String> currentDomains = new ArrayList<String>(opflexDomains.keySet());
+         List<String> newDomains = new ArrayList<String>();
+         List<String> addList = new ArrayList<String>();
+         List <String> dropList = new ArrayList<String>();
+         List <String> updateList = new ArrayList<String>();
+
+         if (result.get() instanceof Domains) {
+
+             /*
+              * Get the new list of domains from the
+              * configuration store, and convert to a
+              * list of the actual domain names for list
+              * manipulation
+              */
+             Domains domains = (Domains)result.get();
+
+             domainList = domains.getDomain();
+             for (Domain domainObj : domainList) {
+                 newDomains.add(domainObj.getId());
+             }
+
+             logger.warn("Current domains {}", currentDomains);
+             /*
+              * Find out what's changed at the domain level.
+              * Classify as additions, deletions, and updates
+              */
+             addList = new ArrayList<String>(newDomains);
+             dropList = new ArrayList<String>(currentDomains);
+             updateList = new ArrayList<String>(newDomains);
+             addList.removeAll(currentDomains);
+             dropList.removeAll(newDomains);
+             updateList.removeAll(addList);
+
+             /*
+              * Drop domains that were removed, along with all
+              * of their servers and connections
+              */
+             for (String d : dropList) {
+                 deleteDomain(d);
+             }
+
+             /*
+              * These are entirely new domains -- get the
+              * information for each new domain and configure
+              */
+             for (String d : addList) {
+                 OpflexDomain od = new OpflexDomain();
+                 od.setDomain(d);
+                 opflexDomains.put(od.getDomain(), od );
+
+                 /* Spawn the servers for this domain */
+                 for (Domain dl : domainList) {
+                     if (dl.getId().equals(d)) {
+                         od.addServers(createServerList(od, dl));
+                         break;
+                     }
+                 }
+             }
+
+             /*
+              * These are domains with updates
+              */
+             for (String d : updateList) {
+                 OpflexDomain od = opflexDomains.get(d);
+                 for (Domain domainObj : domainList) {
+                     if (domainObj.getId().equals(d)) {
+                         logger.warn("updateServers");
+                         od.updateServers(createServerList(od, domainObj));
+                         break;
+                     }
+                 }
+             }
+         }
+     }
 
-        /*
-         * We need to find out if there are any servers that
-         * we have to drop. This is the set of servers that
-         * are already running but don't appear in the configured
-         * list. This just requires a check against the IP/port
-         * (i.e. no need to check role).
-         */
-        Set <String> dropSet = opflexServers.keySet();
-        dropSet.removeAll(addList);
-        dropList.addAll(dropSet);
+     @Override
+    public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>,
+            DataObject>change) {
 
-        /* remove deleted servers first */
-        dropServers(dropList);
-        addServers(serverList);
+        readConfig();
     }
 
     @Override
@@ -574,6 +525,14 @@ public class OpflexConnectionService
     @Override
     public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
 
+        if (!(message instanceof IdentityRequest)) {
+            logger.warn("message is not identity request {}", message);
+            return;
+        }
+        OpflexRpcServer srv = getOpflexServer(endpoint);
+        if (srv == null) return;
+
+        IdentityRequest request = (IdentityRequest)message;
         IdentityResponse.Result result = new IdentityResponse.Result();
 
         List<IdentityResponse.Peer> peers =
@@ -582,30 +541,14 @@ public class OpflexConnectionService
         IdentityResponse response = new IdentityResponse();
 
         /*
-         * We find our role by matching the parent Channel (couldn't
-         * come up with an easier way to do this, as we're trying to
-         * match against the configured identity -- decided against
-         * using the channel's connection b/c things like wildcard
-         * addresses make this comparison tricky). There's also a
-         * minute possibility that the parent socket has been deleted
-         * (e.g. due to reconfiguration) in which case, the peers list
-         * will provide the updated information.
+         *  We inherit our role from the server that spawned
+         *  the connection.
          */
-        OpflexRpcServer srv = null;
         List<String> myRoles = new ArrayList<String>();
-        List<OpflexRpcServer> servers =
-                new ArrayList<OpflexRpcServer>(opflexServers.values());
-        for (OpflexRpcServer server : servers) {
-            if (server.getRpcServer().getChannel() == endpoint.getChannel().parent()) {
-                /* this is our server */
-                List<Role> roles = server.getRoles();
-                if (roles != null) {
-                    for ( Role r : roles ) {
-                        myRoles.add(r.toString());
-                    }
-                }
-                srv = server;
-                break;
+        List<Role> roles = srv.getRoles();
+        if (roles != null) {
+            for ( Role r : roles ) {
+                myRoles.add(r.toString());
             }
         }
         result.setMy_role(myRoles);
@@ -613,23 +556,37 @@ public class OpflexConnectionService
         /*
          * The peers field contains the identifiers other than my_role
          */
-        for (OpflexRpcServer server : servers) {
-            /* Skip our server -- reported in my_role */
-            if ( Objects.equals(server.getId(), srv.getId()))
-                continue;
-            List<Role> roles = server.getRoles();
-            if (roles != null) {
-                for ( Role r : roles ) {
-                    IdentityResponse.Peer peer = new IdentityResponse.Peer();
-                    peer.setConnectivity_info(server.getId());
-                    peer.setRole(r.toString());
-                    peers.add(peer);
+        OpflexDomain od = getOpflexDomain(endpoint);
+        if (request.getParams() == null || request.getParams().size() <= 0) {
+            return;
+        }
+
+        if (!request.getDomain().equals(od.getDomain())) {
+            IdentityResponse.Error error = new IdentityResponse.Error();
+            error.setMessage(INVALID_DOMAIN);
+            response.setError(error);
+            /* send domain mismatch */
+        }
+        else {
+            for (OpflexRpcServer server : od.getOpflexServerList()) {
+                /* Skip our server -- reported in my_role */
+                if ( Objects.equals(server.getId(), srv.getId()))
+                    continue;
+                roles = server.getRoles();
+                if (roles != null) {
+                    for ( Role r : roles ) {
+                        IdentityResponse.Peer peer = new IdentityResponse.Peer();
+                        peer.setConnectivity_info(server.getId());
+                        peer.setRole(r.toString());
+                        peers.add(peer);
+                    }
                 }
             }
+            result.setPeers(peers);
+            result.setName(srv.getId());
+            result.setDomain(od.getDomain());
+            response.setResult(result);
         }
-        result.setPeers(peers);
-        response.setResult(result);
-
         response.setId(message.getId());
 
         /*
@@ -645,29 +602,64 @@ public class OpflexConnectionService
 
     @Override
     public void addConnection(JsonRpcEndpoint endpoint) {
-        List<Role> roles = new ArrayList<Role>();
-        OpflexConnection agent = new OpflexConnection();
-        agent.setEndpoint(endpoint);
-        agent.setIdentity(endpoint.getIdentifier());
 
-        if (endpoint.supportsMessages(policyRepositoryMessages)) {
-            roles.add(Role.POLICY_REPOSITORY);
-        }
-        if (endpoint.supportsMessages(endpointRegistryMessages)) {
-            roles.add(Role.ENDPOINT_REGISTRY);
-        }
-        if (endpoint.supportsMessages(observerMessages)) {
-            roles.add(Role.OBSERVER);
+        /*
+         * When the connection is added, we don't have a context
+         * other than the JsonRpcEndpoint. We use the context
+         * field to store the server object that created this
+         * connection, and can look up things like the domain,
+         * etc. to create the containing connection object.
+         */
+        if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
+            logger.error("Connection for endpoint {} invalid",
+                    endpoint.getIdentifier());
+            // TODO: close connection?
+            return;
         }
-        agent.setRoles(roles);
+
+        OpflexRpcServer server = (OpflexRpcServer)endpoint.getContext();
+        OpflexDomain domain = server.getDomain();
+
+
+        /*
+         * This is the notification when a new endpoint
+         * has been created. Since the endpoint is new,
+         * we don't have a OpflexConnection for it yet. We
+         * create the OpflexConnection, then get the
+         * OpflexRpcServer to set some of the fields
+         * we need (domain, server).
+         */
+        OpflexAgent oc = new OpflexAgent();
+        oc.setEndpoint(endpoint);
+        oc.setIdentity(endpoint.getIdentifier());
+        oc.setDomain(domain.getDomain());
+        oc.setOpflexServer(server);
+        oc.setRoles(server.getRoles());
+
+        /*
+         * The OpFlex domain is determined by the server socket
+         * that the agent connected to. Look up the OpFlex RPC
+         * server using the server socket.
+         *
+         * It's possible that the server was closed or changed
+         * between the connection establishment and now (race
+         * condition). Treat that as a failure, closing the
+         * connection.
+         */
         logger.warn("Adding agent {}", endpoint.getIdentifier());
-        opflexAgents.put(endpoint.getIdentifier(), agent);
+        domain.addOpflexAgent(oc);
     }
 
     @Override
-    public void channelClosed(JsonRpcEndpoint peer) throws Exception {
-        logger.info("Connection to Node : {} closed", peer.getIdentifier());
-        opflexAgents.remove(peer.getIdentifier());
+    public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
+        logger.info("Connection to Node : {} closed", endpoint.getIdentifier());
+        OpflexAgent agent = getOpflexConnection(endpoint);
+        if (agent != null) {
+            OpflexDomain od = opflexDomains.get(agent.getDomain());
+            if (od != null) {
+                od.removeOpflexAgent(agent);
+            }
+        }
     }
 
 }
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomain.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomain.java
new file mode 100644 (file)
index 0000000..6032151
--- /dev/null
@@ -0,0 +1,234 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ *
+ * An OpFlex domain is a logical grouping of OpFlex entities.
+ * The domain aggregates entites and provides methods so that they
+ * can be looked up or referenced by domain.
+ *
+ * The domain field is only present in the OpFlex Identity request message.
+ *
+ * @author tbachman
+ *
+ */
+public class OpflexDomain {
+    String domain;
+    ConcurrentMap<String, OpflexAgent> opflexAgents = null;
+    ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
+
+    OpflexDomain() {
+        opflexAgents = new ConcurrentHashMap<String, OpflexAgent>();
+        opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+
+    public ConcurrentMap<String, OpflexAgent> getOpflexAgents() {
+        return opflexAgents;
+    }
+
+    public void setOpflexAgents(
+            ConcurrentMap<String, OpflexAgent> opflexAgents) {
+        this.opflexAgents = opflexAgents;
+    }
+
+    public ConcurrentMap<String, OpflexRpcServer> getOpflexServers() {
+        return opflexServers;
+    }
+
+    public void setOpflexServers(
+            ConcurrentMap<String, OpflexRpcServer> opflexServers) {
+        this.opflexServers = opflexServers;
+    }
+
+    public void removeOpflexAgent(OpflexAgent agent) {
+        opflexAgents.remove(agent.getIdentity());
+    }
+
+    public void removeOpflexServer(OpflexRpcServer server) {
+        opflexServers.remove(server.getId());
+    }
+
+    public List<OpflexRpcServer> getOpflexServerList() {
+        return new ArrayList<OpflexRpcServer>(opflexServers.values());
+    }
+
+    /**
+     * Clean up all the entities contained by this domain. The
+     * connection service also owns these references, so we
+     * provide notifications to the connection service so that
+     * it can clean up as well.
+     */
+    public void cleanup() {
+        List<String> agents = new ArrayList<String>(opflexAgents.keySet());
+        List<String> servers = new ArrayList<String>(opflexServers.keySet());
+        for (String agent : agents) {
+            OpflexAgent conn = opflexAgents.remove(agent);
+            conn.getEndpoint().getChannel().disconnect();
+        }
+        for (String srv : servers) {
+            OpflexRpcServer server = opflexServers.get(srv);
+            if (server.getRpcServer().getChannel() != null) {
+                server.getRpcServer().getChannel().disconnect();
+            }
+        }
+    }
+
+    /**
+     * Add an {@link OpflexAgent} to the domain
+     *
+     * @param agent The agent to add
+     */
+    public void addOpflexAgent(OpflexAgent agent) {
+        opflexAgents.put(agent.getIdentity(), agent);
+    }
+
+    /**
+     * Return the {@link OpflexAgent} associated
+     * with this identity
+     *
+     * @param identity A string representing the connections identity
+     * @return The connection represented by that key, or null if not found
+     */
+    public OpflexAgent getOpflexAgent(String identity) {
+        return opflexAgents.get(identity);
+    }
+
+    /**
+     * Add the List of servers to the domain
+     *
+     * @param serverList List of new servers to start
+     */
+    public void addServers(List<OpflexRpcServer> serverList) {
+
+        if (serverList == null) return;
+
+        /*
+         * Check to see if there's already a server
+         * with this identity, and if so, close it
+         * and replace it with this one.
+         */
+        for ( OpflexRpcServer srv: serverList ) {
+            OpflexRpcServer server = opflexServers.get(srv.getId());
+            if (server != null) {
+                if ( !server.sameServer(srv)) {
+                    OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
+                    oldServer.getRpcServer().getChannel().disconnect();
+                    opflexServers.put(srv.getId(), srv);
+                    srv.start();
+                }
+            }
+            else {
+                opflexServers.put(srv.getId(), srv);
+                srv.start();
+            }
+        }
+    }
+
+    /**
+     * Drop the list of servers from the domain
+     *
+     * @param oldServers The list of servers to drop
+     *
+     * TODO: Should we provide notifications to or close
+     *       the connections that were spawned by the
+     *       deleted servers?
+     */
+    public void dropServers(List<String> oldServers) {
+        OpflexRpcServer server;
+
+        /*
+         * Check to see if there's already a server
+         * with this identity, and if so, close it
+         * and replace it with this one.
+         */
+        for (String srv: oldServers) {
+            if (opflexServers.containsKey(srv)) {
+                server = opflexServers.remove(srv);
+                server.getRpcServer().getChannel().disconnect();
+            }
+        }
+    }
+
+    /**
+     * Check the new configuration of the servers against the
+     * existing, and if different, delete the old server and
+     * replace it with a new server running the updated parameters.
+     *
+     * @param serverList The new server configurations
+     */
+    public void updateServers(List<OpflexRpcServer> serverList) {
+        /* Get the new list of configured servers in this domain */
+        List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
+        List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
+        List<String> newList = new ArrayList<String>();
+
+        for (OpflexRpcServer srv : serverList) {
+            newList.add(srv.getId());
+        }
+
+        /* Get the list of currently configured servers in this domain*/
+        List<String> currentList =
+                new ArrayList<String>(opflexServers.keySet());
+
+        /* Make the add/drop/update lists */
+        List<String> addList = new ArrayList<String>(newList);
+        List<String> dropList = new ArrayList<String>(currentList);
+        List<String> updateList = new ArrayList<String>(newList);
+
+        addList.removeAll(currentList);
+        dropList.removeAll(newList);
+        updateList.removeAll(addList);
+
+        /*
+         * Create a list of new servers by skipping any servers in the
+         * list that are already configured (i.e. same IP/socket and set
+         * of roles) -- no need to take them down
+         */
+        for (OpflexRpcServer srv: serverList) {
+            /*
+             * If this in our update list, check parameters
+             * to see if we really need to update it
+             */
+            if (updateList.contains(srv.getId())) {
+                OpflexRpcServer s = opflexServers.get(srv.getId());
+                if (s != null && s.getRoles().containsAll(srv.getRoles())) {
+                    continue;
+                }
+                updateServers.add(srv);
+
+            }
+            if (addList.contains(srv.getId())) {
+                newServers.add(srv);
+            }
+        }
+
+
+        dropServers(dropList);
+        addServers(newServers);
+        addServers(updateServers);
+    }
+
+
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServer.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServer.java
new file mode 100644 (file)
index 0000000..a2e62bb
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+
+/**
+ * The {@link OpflexRpcServer}s respond to OpFlex clients
+ * which create {@link OpflexAgent} objects when they
+ * are established. The servers don't own the connections,
+ * which allows the clients to continue operation even if
+ * the server is closed
+ *
+ * @author tbachman
+ *
+ */
+public class OpflexRpcServer {
+
+    private String identity;
+    private OpflexDomain domain;
+    private List<Role> roles;
+    private RpcServer rpcServer;
+    private ConnectionService connectionService;
+    private RpcBroker rpcBroker;
+
+    private String address;
+    private int port;
+
+    private void parseAndSetIdentity(String identity) {
+        if (identity.split(":").length == 2) {
+            this.identity = identity;
+            this.address = identity.split(":")[0];
+            this.port =  Integer.parseInt(identity.split(":")[1]);
+        }
+    }
+
+    public OpflexRpcServer(OpflexDomain domain, String identity, List<Role> roles) {
+        this.domain = domain;
+        this.roles = roles;
+        parseAndSetIdentity(identity);
+        rpcServer = new RpcServer(address, port);
+        rpcServer.setContext(this);
+    }
+
+    public OpflexDomain getDomain() {
+        return domain;
+    }
+
+    public String getId() {
+        return this.identity;
+    }
+
+    public RpcServer getRpcServer() {
+        return rpcServer;
+    }
+
+    public ConnectionService getConnectionService() {
+        return connectionService;
+    }
+
+    public void setConnectionService(ConnectionService service) {
+        this.connectionService = service;
+    }
+
+    public RpcBroker getRpcBroker() {
+        return this.rpcBroker;
+    }
+
+    public void setRpcBroker(RpcBroker rpcBroker) {
+        this.rpcBroker = rpcBroker;
+    }
+
+    public List<Role> getRoles() {
+        return this.roles;
+    }
+
+    /**
+     * Start the {@link OpflexRpcServer}. This adds the supported
+     * messages to the server, based on the roles that were
+     * configured. It creates an {@link RpcServer} object,
+     * passes it the context owned by the {@link OpflexRpcServer},
+     * and starts the server in its own thread.
+     *
+     * TODO: should use executor service instead?
+     */
+    public void start() {
+        rpcServer.setConnectionService(connectionService);
+        rpcServer.setRpcBroker(rpcBroker);
+
+        for ( Role role : roles ) {
+            rpcServer.addMessageList(role.getMessages());
+        }
+
+        new Thread() {
+            private RpcServer server;
+
+            public Thread initializeServerParams(RpcServer server) {
+                this.server = server;
+                return this;
+            }
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                }
+            }
+        }.initializeServerParams(rpcServer).start();
+
+    }
+
+    /**
+     * Check to see if two servers are the same. They
+     * need to be in the same Opflex Domain, have the same
+     * identity, and the same roles, or they can be
+     * identical objects. Note that it purposely does
+     * not compare the RpcServer, as the purpose for
+     * this method is to see if there is already a server
+     * fulfilling this configuration (which is the reason
+     * it's a new method, instead of overriding toString).
+     *
+     * @param srv The server to compare against
+     * @return true if they are equivalent
+     */
+    public boolean sameServer(OpflexRpcServer srv) {
+        if (this == srv)
+            return true;
+        if (srv == null)
+            return false;
+        if (!this.identity.equals(srv.identity))
+            return false;
+        if (this.domain == null ||
+                !this.domain.getDomain().equals(srv.getDomain().getDomain()))
+            return false;
+        if (this.roles == null && srv.roles == null)
+            return true;
+        if (this.roles == null || srv.roles == null)
+            return false;
+        if (this.roles.size() == srv.roles.size()
+                && this.roles.containsAll(srv.roles))
+            return true;
+        return false;
+    }
+}
diff --git a/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Role.java b/groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/Role.java
new file mode 100644 (file)
index 0000000..c314eea
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+
+/**
+ * Enum for OpFlex roles and their supported messages
+ *
+ * @author tbachman
+ *
+ */
+public enum Role {
+    POLICY_REPOSITORY("policy_repository"),
+    ENDPOINT_REGISTRY("endpoint_registry"),
+    OBSERVER("observer"),
+    POLICY_ELEMENT("policy_element");
+
+    static IdentityRequest idReq = new IdentityRequest();
+    static IdentityResponse idRsp = new IdentityResponse();
+
+    private final String role;
+
+    Role(String role) {
+        this.role = role;
+    }
+
+    /**
+     * Get the {@link RpcMessage}s supported by this Role
+     *
+     * @return List of RpcMessages supported for this Role
+     */
+    public List<RpcMessage> getMessages() {
+        if (role.equals(POLICY_REPOSITORY.toString())) {
+            List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+            msgList.add(idReq);
+            msgList.add(idRsp);
+            return msgList;
+        }
+        else if (role.equals(ENDPOINT_REGISTRY.toString())) {
+            List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+            msgList.add(idReq);
+            msgList.add(idRsp);
+            return msgList;
+        }
+        else if (role.equals(OBSERVER.toString())) {
+            List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+            msgList.add(idReq);
+            msgList.add(idRsp);
+            return msgList;
+        }
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return this.role;
+    }
+}
\ No newline at end of file
similarity index 90%
rename from groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/IdentityRequest.java
rename to groupbasedpolicy/src/main/java/org/opendaylight/groupbasedpolicy/renderer/opflex/messages/IdentityRequest.java
index 6a58bde9636e56eb3be2dfd599cc9f98637dab9c..0858fcc72f6cab58d1b85fc6572409256b68fc99 100644 (file)
@@ -7,7 +7,7 @@
  *
  * Authors : Thomas Bachman
  */
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -110,4 +110,11 @@ public class IdentityRequest extends RpcMessage {
     public IdentityRequest() {
         this.name = IDENTITY_MESSAGE;
     }
+
+    public String getDomain() {
+        if (this.params != null && this.params.get(0) != null) {
+            return this.params.get(0).getDomain();
+        }
+        return null;
+    }
 }
index 39d98b09749d7e905bc5b6ef62a97212a4e2c796..9b413f0a8105a4a20ababb6cb34afb27e37d76c5 100644 (file)
@@ -74,29 +74,29 @@ import com.google.common.util.concurrent.ListenableFuture;
 /**
  * The policy resolver is a utility for renderers to help in resolving
  * group-based policy into a form that is easier to apply to the actual network.
- * 
- * <p>For any pair of endpoint groups, there is a set of rules that could apply 
+ *
+ * <p>For any pair of endpoint groups, there is a set of rules that could apply
  * to the endpoints on that group based on the policy configuration.  The exact
- * list of rules that apply to a given pair of endpoints depends on the 
+ * list of rules that apply to a given pair of endpoints depends on the
  * conditions that are active on the endpoints.
- * 
- * In a more formal sense: Let there be endpoint groups G_n, and for each G_n a 
- * set of conditions C_n that can apply to endpoints in G_n.  Further, let S be 
- * the set of lists of rules defined in the policy.  Our policy can be 
- * represented as a function F: (G_n, 2^C_n, G_m, 2^C_m) -> S, where 2^C_n 
- * represents the power set of C_n. In other words, we want to map all the 
- * possible tuples of pairs of endpoints along with their active conditions 
+ *
+ * In a more formal sense: Let there be endpoint groups G_n, and for each G_n a
+ * set of conditions C_n that can apply to endpoints in G_n.  Further, let S be
+ * the set of lists of rules defined in the policy.  Our policy can be
+ * represented as a function F: (G_n, 2^C_n, G_m, 2^C_m) -> S, where 2^C_n
+ * represents the power set of C_n. In other words, we want to map all the
+ * possible tuples of pairs of endpoints along with their active conditions
  * onto the right list of rules to apply.
- * 
- * <p>We need to be able to query against this policy model, enumerate the 
+ *
+ * <p>We need to be able to query against this policy model, enumerate the
  * relevant classes of traffic and endpoints, and notify renderers when there
- * are changes to policy as it applies to active sets of endpoints and 
+ * are changes to policy as it applies to active sets of endpoints and
  * endpoint groups.
- * 
+ *
  * <p>The policy resolver will maintain the necessary state for all tenants
- * in its control domain, which is the set of tenants for which 
+ * in its control domain, which is the set of tenants for which
  * policy listeners have been registered.
- * 
+ *
  * @author readams
  */
 public class PolicyResolver implements AutoCloseable {
@@ -104,16 +104,16 @@ public class PolicyResolver implements AutoCloseable {
 
     private final DataBroker dataProvider;
     private final ScheduledExecutorService executor;
-    
+
     /**
      *  Keep track of the current relevant policy scopes.
      */
     private CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
-    
+
     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
-    
+
     private PolicyCache policyCache = new PolicyCache();
-    
+
     public PolicyResolver(DataBroker dataProvider,
                           ScheduledExecutorService executor) {
         super();
@@ -141,28 +141,28 @@ public class PolicyResolver implements AutoCloseable {
     // *************************
 
     /**
-     * Get the policy that currently applies to a pair of endpoints. 
+     * Get the policy that currently applies to a pair of endpoints.
      * with the specified groups and conditions.  The first endpoint acts as
-     * the consumer and the second endpoint acts as the provider, so to get 
+     * the consumer and the second endpoint acts as the provider, so to get
      * all policy related to this pair of endpoints you must call this
      * function twice: once for each possible order of endpoints.
-     * 
-     * @param ep1Tenant the tenant ID for the first endpoint 
-     * @param ep1Group the endpoint group for the first endpoint 
+     *
+     * @param ep1Tenant the tenant ID for the first endpoint
+     * @param ep1Group the endpoint group for the first endpoint
      * @param ep1Conds The conditions that apply to the first endpoint
      * @param ep2Tenant the tenant ID for the second endpoint
-     * @param ep2Group the endpoint group for the second endpoint 
+     * @param ep2Group the endpoint group for the second endpoint
      * @param ep2Conds The conditions that apply to the second endpoint.
      * @return a list of {@link RuleGroup} that apply to the endpoints.
      * Cannot be null, but may be an empty list of rulegroups
      */
     public List<RuleGroup> getPolicy(TenantId ep1Tenant,
-                                     EndpointGroupId ep1Group, 
+                                     EndpointGroupId ep1Group,
                                      ConditionSet ep1Conds,
                                      TenantId ep2Tenant,
-                                     EndpointGroupId ep2Group, 
+                                     EndpointGroupId ep2Group,
                                      ConditionSet ep2Conds) {
-        return policyCache.getPolicy(ep1Tenant, ep1Group, ep1Conds, 
+        return policyCache.getPolicy(ep1Tenant, ep1Group, ep1Conds,
                                      ep2Tenant, ep2Group, ep2Conds);
     }
 
@@ -176,7 +176,7 @@ public class PolicyResolver implements AutoCloseable {
         if (tc == null) return null;
         return tc.tenant.get();
     }
-    
+
     /**
      * Register a listener to receive update events.
      * @param listener the {@link PolicyListener} object to receive the update
@@ -185,33 +185,33 @@ public class PolicyResolver implements AutoCloseable {
     public PolicyScope registerListener(PolicyListener listener) {
         PolicyScope ps = new PolicyScope(this, listener);
         policyListenerScopes.add(ps);
-        
+
         return ps;
     }
-    
+
     /**
      * Remove the listener registered for the given {@link PolicyScope}.
      * @param scope the scope to remove
      * @see PolicyResolver#registerListener(PolicyListener)
      */
     public void removeListener(PolicyScope scope) {
-        policyListenerScopes.remove(scope);        
+        policyListenerScopes.remove(scope);
     }
 
     // **************
     // Implementation
     // **************
-    
+
     /**
      * Notify the policy listeners about a set of updated consumers
      */
     private void notifyListeners(Set<EgKey> updatedConsumers) {
         for (final PolicyScope scope : policyListenerScopes) {
-            Set<EgKey> filtered = 
+            Set<EgKey> filtered =
                     Sets.filter(updatedConsumers, new Predicate<EgKey>() {
                         @Override
                         public boolean apply(EgKey input) {
-                            return scope.contains(input.getTenantId(), 
+                            return scope.contains(input.getTenantId(),
                                                   input.getEgId());
                         }
                     });
@@ -220,7 +220,7 @@ public class PolicyResolver implements AutoCloseable {
             }
         }
     }
-    
+
     /**
      * Subscribe the resolver to updates related to a particular tenant
      * Make sure that this can't be called concurrently with subscribe
@@ -253,13 +253,13 @@ public class PolicyResolver implements AutoCloseable {
             if (dataProvider != null) {
                  registration = dataProvider
                          .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
-                                                     TenantUtils.tenantIid(tenantId), 
+                                                     TenantUtils.tenantIid(tenantId),
                                                      new PolicyChangeListener(tenantId),
                                                      DataChangeScope.SUBTREE);
             }
 
             context = new TenantContext(registration);
-            TenantContext oldContext = 
+            TenantContext oldContext =
                     resolvedTenants.putIfAbsent(tenantId, context);
             if (oldContext != null) {
                 // already registered in a different thread; just use the other
@@ -272,16 +272,16 @@ public class PolicyResolver implements AutoCloseable {
         // Resolve the new tenant and update atomically
         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
         final IndexedTenant ot = tenantRef.get();
-        ReadOnlyTransaction transaction = 
+        ReadOnlyTransaction transaction =
                 dataProvider.newReadOnlyTransaction();
         InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
-        ListenableFuture<Optional<DataObject>> unresolved;
+        ListenableFuture<Optional<Tenant>> unresolved;
 
         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
-        
-        Futures.addCallback(unresolved, new FutureCallback<Optional<DataObject>>() {
+
+        Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
             @Override
-            public void onSuccess(Optional<DataObject> result) {
+            public void onSuccess(Optional<Tenant> result) {
                 if (!result.isPresent()) return;
 
                 Tenant t = InheritanceUtils.resolveTenant((Tenant)result.get());
@@ -291,8 +291,8 @@ public class PolicyResolver implements AutoCloseable {
                     updateTenant(tenantId);
                 } else {
                     // Update the policy cache and notify listeners
-                    Table<EgKey, EgKey, Policy> policy = resolvePolicy(t);        
-                    Set<EgKey> updatedConsumers = 
+                    Table<EgKey, EgKey, Policy> policy = resolvePolicy(t);
+                    Set<EgKey> updatedConsumers =
                             policyCache.updatePolicy(policy, policyListenerScopes);
 
                     notifyListeners(updatedConsumers);
@@ -305,65 +305,65 @@ public class PolicyResolver implements AutoCloseable {
             }
         }, executor);
     }
-    
-    
+
+
     /**
      * Resolve the policy in three phases:
-     * (1) select contracts that in scope based on contract selectors. 
+     * (1) select contracts that in scope based on contract selectors.
      * (2) select subjects that are in scope for each contract based on
      * matchers in clauses
      * (3) resolve the set of in-scope contracts into a list of subjects that
-     * apply for each pair of endpoint groups and the conditions that can 
+     * apply for each pair of endpoint groups and the conditions that can
      * apply for for each endpoint in those groups.
      */
     protected Table<EgKey, EgKey, Policy> resolvePolicy(Tenant t) {
         // select contracts that apply for the given tenant
         Table<EgKey, EgKey, List<ContractMatch>> contractMatches =
                 selectContracts(t);
-        
+
         // select subjects for the matching contracts and resolve the policy
         // for endpoint group pairs.  This does phase (2) and (3) as one step
         return selectSubjects(contractMatches);
     }
-    
+
     /**
      * Choose the contracts that are in scope for each pair of endpoint
      * groups, then perform subject selection for the pair
      */
-    protected Table<EgKey, EgKey, List<ContractMatch>> 
+    protected Table<EgKey, EgKey, List<ContractMatch>>
         selectContracts(Tenant tenant) {
-        // For each endpoint group, match consumer selectors 
+        // For each endpoint group, match consumer selectors
         // against contracts to get a set of matching consumer selectors
-        Table<TenantId, ContractId, List<ConsumerContractMatch>> consumerMatches = 
+        Table<TenantId, ContractId, List<ConsumerContractMatch>> consumerMatches =
                 HashBasedTable.create();
         if (tenant.getEndpointGroup() == null) return HashBasedTable.create();
         for (EndpointGroup group : tenant.getEndpointGroup()) {
-            List<ConsumerContractMatch> r = 
+            List<ConsumerContractMatch> r =
                     matchConsumerContracts(tenant, group);
             for (ConsumerContractMatch ccm : r) {
-                List<ConsumerContractMatch> cms = 
-                        consumerMatches.get(tenant.getId(), 
+                List<ConsumerContractMatch> cms =
+                        consumerMatches.get(tenant.getId(),
                                             ccm.contract.getId());
                 if (cms == null) {
                     cms = new ArrayList<>();
-                    consumerMatches.put(tenant.getId(), 
+                    consumerMatches.put(tenant.getId(),
                                         ccm.contract.getId(), cms);
                 }
                 cms.add(ccm);
             }
         }
-        
+
         // Match provider selectors, and check each match for a corresponding
         // consumer selector match.
-        Table<EgKey, EgKey, List<ContractMatch>> contractMatches = 
+        Table<EgKey, EgKey, List<ContractMatch>> contractMatches =
                 HashBasedTable.create();
         for (EndpointGroup group : tenant.getEndpointGroup()) {
-            List<ContractMatch> matches = 
+            List<ContractMatch> matches =
                     matchProviderContracts(tenant, group, consumerMatches);
             for (ContractMatch cm : matches) {
-                EgKey consumerKey = new EgKey(cm.consumerTenant.getId(), 
+                EgKey consumerKey = new EgKey(cm.consumerTenant.getId(),
                                               cm.consumer.getId());
-                EgKey providerKey = new EgKey(cm.providerTenant.getId(), 
+                EgKey providerKey = new EgKey(cm.providerTenant.getId(),
                                               cm.provider.getId());
                 List<ContractMatch> egPairMatches =
                         contractMatches.get(consumerKey, providerKey);
@@ -378,14 +378,14 @@ public class PolicyResolver implements AutoCloseable {
         }
         return contractMatches;
     }
-    
+
     private boolean clauseMatches(Clause clause, ContractMatch match) {
         if (clause.getConsumerMatchers() != null) {
-            List<RequirementMatcher> reqMatchers = 
+            List<RequirementMatcher> reqMatchers =
                     clause.getConsumerMatchers().getRequirementMatcher();
             if (reqMatchers != null) {
                 for (RequirementMatcher reqMatcher : reqMatchers) {
-                    if (!MatcherUtils.applyReqMatcher(reqMatcher, 
+                    if (!MatcherUtils.applyReqMatcher(reqMatcher,
                                                       match.consumerRelator)) {
                         return false;
                     }
@@ -393,11 +393,11 @@ public class PolicyResolver implements AutoCloseable {
             }
         }
         if (clause.getProviderMatchers() != null) {
-            List<CapabilityMatcher> capMatchers = 
+            List<CapabilityMatcher> capMatchers =
                     clause.getProviderMatchers().getCapabilityMatcher();
             if (capMatchers != null) {
                 for (CapabilityMatcher capMatcher : capMatchers) {
-                    if (!MatcherUtils.applyCapMatcher(capMatcher, 
+                    if (!MatcherUtils.applyCapMatcher(capMatcher,
                                                       match.providerRelator)) {
                         return false;
                     }
@@ -412,7 +412,7 @@ public class PolicyResolver implements AutoCloseable {
 
         ImmutableSet.Builder<ConditionName> allb = ImmutableSet.builder();
         ImmutableSet.Builder<ConditionName> noneb = ImmutableSet.builder();
-        ImmutableSet.Builder<Set<ConditionName>> anyb = 
+        ImmutableSet.Builder<Set<ConditionName>> anyb =
                 ImmutableSet.builder();
         for (ConditionMatcher condMatcher : condMatchers) {
             if (condMatcher.getCondition() == null)
@@ -420,13 +420,13 @@ public class PolicyResolver implements AutoCloseable {
             MatchType type = condMatcher.getMatchType();
             if (type == null) type = MatchType.All;
             if (type.equals(MatchType.Any)) {
-                ImmutableSet.Builder<ConditionName> a = 
+                ImmutableSet.Builder<ConditionName> a =
                         ImmutableSet.builder();
                 for (Condition c : condMatcher.getCondition()) {
                     a.add(c.getName());
                 }
                 anyb.add(a.build());
-            } else { 
+            } else {
                 for (Condition c : condMatcher.getCondition()) {
                     switch (type) {
                     case Any:
@@ -444,7 +444,7 @@ public class PolicyResolver implements AutoCloseable {
         }
         return new ConditionSet(allb.build(), noneb.build(), anyb.build());
     }
-    
+
     private ConditionSet buildConsConditionSet(Clause clause) {
         if (clause.getConsumerMatchers() != null) {
             List<ConditionMatcher> condMatchers =
@@ -462,20 +462,20 @@ public class PolicyResolver implements AutoCloseable {
         }
         return ConditionSet.EMPTY;
     }
-    
+
     private Policy resolvePolicy(Tenant contractTenant,
                                  Contract contract,
                                  Policy merge,
                                  Table<ConditionSet, ConditionSet, List<Subject>> subjectMap) {
-        Table<ConditionSet, ConditionSet, List<RuleGroup>> ruleMap = 
+        Table<ConditionSet, ConditionSet, List<RuleGroup>> ruleMap =
                 HashBasedTable.create();
         if (merge != null) {
             ruleMap.putAll(merge.ruleMap);
         }
-        for (Cell<ConditionSet, ConditionSet, List<Subject>> entry : 
+        for (Cell<ConditionSet, ConditionSet, List<Subject>> entry :
                 subjectMap.cellSet()) {
             List<RuleGroup> rules = new ArrayList<>();
-            List<RuleGroup> oldrules = 
+            List<RuleGroup> oldrules =
                     ruleMap.get(entry.getRowKey(), entry.getColumnKey());
             if (oldrules != null) {
                 rules.addAll(oldrules);
@@ -491,18 +491,18 @@ public class PolicyResolver implements AutoCloseable {
                 rules.add(rg);
             }
             Collections.sort(rules);
-            ruleMap.put(entry.getRowKey(), entry.getColumnKey(), 
+            ruleMap.put(entry.getRowKey(), entry.getColumnKey(),
                         Collections.unmodifiableList(rules));
         }
         return new Policy(ruleMap);
     }
-    
+
     /**
-     * Choose the set of subjects that in scope for each possible set of 
+     * Choose the set of subjects that in scope for each possible set of
      * endpoint conditions
      */
-    protected Table<EgKey, EgKey, Policy> 
-            selectSubjects(Table<EgKey, EgKey, 
+    protected Table<EgKey, EgKey, Policy>
+            selectSubjects(Table<EgKey, EgKey,
                                  List<ContractMatch>> contractMatches) {
         // Note that it's possible to further simplify the resulting policy
         // in the case of things like repeated rules, condition sets that
@@ -517,7 +517,7 @@ public class PolicyResolver implements AutoCloseable {
 
                 List<Subject> subjectList = match.contract.getSubject();
                 if (subjectList == null) continue;
-                
+
                 EgKey ckey = new EgKey(match.consumerTenant.getId(),
                                        match.consumer.getId());
                 EgKey pkey = new EgKey(match.providerTenant.getId(),
@@ -536,21 +536,21 @@ public class PolicyResolver implements AutoCloseable {
                     }
                     if (alreadyMatched) continue;
                 }
-                
+
                 HashMap<SubjectName, Subject> subjects = new HashMap<>();
                 for (Subject s : subjectList) {
                     subjects.put(s.getName(), s);
                 }
-                
-                Table<ConditionSet, ConditionSet, List<Subject>> subjectMap = 
+
+                Table<ConditionSet, ConditionSet, List<Subject>> subjectMap =
                         HashBasedTable.create();
-                
+
                 for (Clause clause : clauses) {
                     if (clause.getSubjectRefs() != null &&
                         clauseMatches(clause, match)) {
                         ConditionSet consCSet = buildConsConditionSet(clause);
                         ConditionSet provCSet = buildProvConditionSet(clause);
-                        List<Subject> clauseSubjects = 
+                        List<Subject> clauseSubjects =
                                 subjectMap.get(consCSet, provCSet);
                         if (clauseSubjects == null) {
                             clauseSubjects = new ArrayList<>();
@@ -563,17 +563,17 @@ public class PolicyResolver implements AutoCloseable {
                     }
                 }
 
-                policy.put(ckey, pkey, 
-                           resolvePolicy(match.contractTenant, 
+                policy.put(ckey, pkey,
+                           resolvePolicy(match.contractTenant,
                                          match.contract,
-                                         existing, 
+                                         existing,
                                          subjectMap));
             }
         }
-        
+
         return policy;
     }
-    
+
     private List<ConsumerContractMatch> matchConsumerContracts(Tenant tenant,
                                                                EndpointGroup consumer) {
         List<ConsumerContractMatch> matches = new ArrayList<>();
@@ -581,11 +581,11 @@ public class PolicyResolver implements AutoCloseable {
             for (ConsumerNamedSelector cns : consumer.getConsumerNamedSelector()) {
                 if (cns.getContract() == null) continue;
                 for (ContractId contractId : cns.getContract()) {
-                    Contract contract = 
+                    Contract contract =
                             TenantUtils.findContract(tenant, contractId);
                     if (contract == null) continue;
-                    matches.add(new ConsumerContractMatch(tenant, contract, 
-                                                          tenant, consumer, 
+                    matches.add(new ConsumerContractMatch(tenant, contract,
+                                                          tenant, consumer,
                                                           cns));
                 }
             }
@@ -606,10 +606,10 @@ public class PolicyResolver implements AutoCloseable {
                             }
                         }
                         if (match) {
-                            matches.add(new ConsumerContractMatch(tenant, 
-                                                                  contract, 
-                                                                  tenant, 
-                                                                  consumer, 
+                            matches.add(new ConsumerContractMatch(tenant,
+                                                                  contract,
+                                                                  tenant,
+                                                                  consumer,
                                                                   cts));
                         }
                     }
@@ -620,7 +620,7 @@ public class PolicyResolver implements AutoCloseable {
 //        for (ConsumerTargetSelector cts : consumer.getConsumerTargetSelector()) {
 //            if (tenant.getContractRef() == null) continue;
 //            for (ContractRef c : tenant.getContractRef()) {
-//                
+//
 //            }
 //        }
         return matches;
@@ -628,18 +628,18 @@ public class PolicyResolver implements AutoCloseable {
 
     private void amendContractMatches(List<ContractMatch> matches,
                                       List<ConsumerContractMatch> cMatches,
-                                      Tenant tenant, EndpointGroup provider, 
+                                      Tenant tenant, EndpointGroup provider,
                                       ProviderSelectionRelator relator) {
         if (cMatches == null) return;
         for (ConsumerContractMatch cMatch : cMatches) {
             matches.add(new ContractMatch(cMatch, tenant, provider, relator));
         }
     }
-    
-    private List<ContractMatch> 
+
+    private List<ContractMatch>
         matchProviderContracts(Tenant tenant, EndpointGroup provider,
-                               Table<TenantId, 
-                                     ContractId, 
+                               Table<TenantId,
+                                     ContractId,
                                      List<ConsumerContractMatch>> consumerMatches) {
         List<ContractMatch> matches = new ArrayList<>();
         if (provider.getProviderNamedSelector() != null) {
@@ -648,7 +648,7 @@ public class PolicyResolver implements AutoCloseable {
                 for (ContractId contractId : pns.getContract()) {
                     Contract c = TenantUtils.findContract(tenant, contractId);
                     if (c == null) continue;
-                    List<ConsumerContractMatch> cMatches = 
+                    List<ConsumerContractMatch> cMatches =
                             consumerMatches.get(tenant.getId(), c.getId());
                     amendContractMatches(matches, cMatches, tenant, provider, pns);
                 }
@@ -670,10 +670,10 @@ public class PolicyResolver implements AutoCloseable {
                             }
                         }
                         if (match) {
-                            List<ConsumerContractMatch> cMatches = 
-                                    consumerMatches.get(tenant.getId(), 
+                            List<ConsumerContractMatch> cMatches =
+                                    consumerMatches.get(tenant.getId(),
                                                         c.getId());
-                            amendContractMatches(matches, cMatches, tenant, 
+                            amendContractMatches(matches, cMatches, tenant,
                                                  provider, pts);
 
                         }
@@ -688,13 +688,13 @@ public class PolicyResolver implements AutoCloseable {
         ListenerRegistration<DataChangeListener> registration;
 
         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
-        
+
         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
             super();
             this.registration = registration;
         }
     }
-    
+
     /**
      * Represents a selected contract made by endpoint groups matching it
      * using selection relators.  This is the result of the contract selection
@@ -708,12 +708,12 @@ public class PolicyResolver implements AutoCloseable {
          * The tenant ID of the provider endpoint group
          */
         final Tenant providerTenant;
-        
+
         /**
          * The provider endpoint group
          */
         final EndpointGroup provider;
-        
+
         /**
          * The provider selection relator that was used to match the contract
          */
@@ -722,10 +722,10 @@ public class PolicyResolver implements AutoCloseable {
         public ContractMatch(ConsumerContractMatch consumerMatch,
                              Tenant providerTenant, EndpointGroup provider,
                              ProviderSelectionRelator providerRelator) {
-            super(consumerMatch.contractTenant, 
-                  consumerMatch.contract, 
+            super(consumerMatch.contractTenant,
+                  consumerMatch.contract,
                   consumerMatch.consumerTenant,
-                  consumerMatch.consumer, 
+                  consumerMatch.consumer,
                   consumerMatch.consumerRelator);
             this.providerTenant = providerTenant;
             this.provider = provider;
@@ -739,7 +739,7 @@ public class PolicyResolver implements AutoCloseable {
          * The tenant of the matching contract
          */
         final Tenant contractTenant;
-        
+
         /**
          * The matching contract
          */
@@ -749,17 +749,17 @@ public class PolicyResolver implements AutoCloseable {
          * The tenant for the endpoint group
          */
         final Tenant consumerTenant;
-        
+
         /**
          * The consumer endpoint group
          */
         final EndpointGroup consumer;
-        
+
         /**
          * The consumer selection relator that was used to match the contract
          */
         final ConsumerSelectionRelator consumerRelator;
-        
+
 
         public ConsumerContractMatch(Tenant contractTenant,
                                      Contract contract,
@@ -778,7 +778,7 @@ public class PolicyResolver implements AutoCloseable {
     @Immutable
     private class PolicyChangeListener implements DataChangeListener {
         final TenantId tenantId;
-        
+
         public PolicyChangeListener(TenantId tenantId) {
             super();
             this.tenantId = tenantId;
@@ -786,9 +786,9 @@ public class PolicyResolver implements AutoCloseable {
 
         @Override
         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
-            updateTenant(tenantId);            
+            updateTenant(tenantId);
         }
-        
+
     }
 
 }
index 43bf9e3464a41f9df2f230c6621875255fd483df..5f8c03c57bc9b5c17425350dfd494d595fbacb0c 100644 (file)
@@ -44,92 +44,113 @@ module opflex {
         }
     }
 
-    // ******************
-    // Configuration Data
-    // ******************
-    container discovery-definitions {
+    container domains {
         description
-            "The nodes that any OpFlex agent needs to communicate
-             with in an OpFlex policy fabric.";
+            "The list of all OpFlex domains.";
 
-        list policy-repository {
-            description
-                "A repository that the OpFlex agent can use for resolving
-                 policies.";
-
-            key "id";
+        config true;
 
-            leaf id {
-                description 
-                   "The id for the policy repository.";
-                type string;
-                config true;
-            }
-            leaf port {
-                description 
-                   "The port number to use for the connection";
-                type int32;
-                config true;
-            }
-            leaf serialization-type {
-                description 
-                    "The serialization to use for this connection.";
-                type serialization;
-                config true;
-            }
-        }
-
-        list endpoint-registry {
+        list domain {
             description
-                "A repository that the OpFlex agent can use for registration
-                 and lookup of endpoints.";
+                "An administrative domain for OpFlex entities.";
 
             key "id";
 
             leaf id {
                 description 
-                   "The id for the endpoint registry.";
+                    "The id for the domain.";
                 type string;
                 config true;
             }
-            leaf port {
-                description 
-                   "The port number to use for the connection";
-                type int32;
-                config true;
-            }
-            leaf serialization-type {
-                description 
-                    "The serialization to use for this connection.";
-                type serialization;
-                config true;
-            }
-        }
-
-        list observer {
-            description
-                "A repository that the OpFlex agent can send State Report
-                 messages to.";
-
-            key "id";
 
-            leaf id {
-                description 
-                   "The id for the Observer.";
-                type string;
-                config true;
-            }
-            leaf port {
-                description 
-                   "The port number to use for the connection";
-                type int32;
-                config true;
-            }
-            leaf serialization-type {
-                description 
-                    "The serialization to use for this connection.";
-                type serialization;
-                config true;
+            // ******************
+            // Configuration Data
+            // ******************
+            container discovery-definitions {
+                description
+                    "The nodes that any OpFlex agent needs to communicate
+                     with in an OpFlex policy fabric.";
+
+                list policy-repository {
+                    description
+                        "A repository that the OpFlex agent can use for resolving
+                         policies.";
+
+                    key "id";
+
+                    leaf id {
+                        description 
+                           "The id for the policy repository.";
+                        type string;
+                        config true;
+                    }
+                    leaf port {
+                        description 
+                           "The port number to use for the connection";
+                        type int32;
+                        config true;
+                    }
+                    leaf serialization-type {
+                        description 
+                            "The serialization to use for this connection.";
+                        type serialization;
+                        config true;
+                    }
+                }
+
+                list endpoint-registry {
+                    description
+                        "A repository that the OpFlex agent can use for registration
+                         and lookup of endpoints.";
+
+                    key "id";
+
+                    leaf id {
+                        description 
+                           "The id for the endpoint registry.";
+                        type string;
+                        config true;
+                    }
+                    leaf port {
+                        description 
+                           "The port number to use for the connection";
+                        type int32;
+                        config true;
+                    }
+                    leaf serialization-type {
+                        description 
+                            "The serialization to use for this connection.";
+                        type serialization;
+                        config true;
+                    }
+                }
+
+                list observer {
+                    description
+                        "A repository that the OpFlex agent can send State Report
+                         messages to.";
+
+                    key "id";
+
+                    leaf id {
+                        description 
+                           "The id for the Observer.";
+                        type string;
+                        config true;
+                    }
+                    leaf port {
+                        description 
+                           "The port number to use for the connection";
+                        type int32;
+                        config true;
+                    }
+                    leaf serialization-type {
+                        description 
+                            "The serialization to use for this connection.";
+                        type serialization;
+                        config true;
+                    }
+                }
             }
         }
     }
index 39841e3339cd62061bc0cb05d11c985fef24c20b..28046062da50cf7acf2fbf6d5254e0f774d02dca 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.groupbasedpolicy.renderer.opflex;
 
 import static io.netty.buffer.Unpooled.copiedBuffer;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -27,20 +27,25 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.Domains;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.Domain;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.ObserverBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepository;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepositoryBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,17 +62,13 @@ import com.google.common.util.concurrent.ListenableFuture;
 public class OpflexConnectionServiceTest {
     protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
 
-    static private final String TEST_RPC_MESSAGE_NAME = "test_message";
-
     static private final String TEST_EP_UUID = "85d53c32-47af-4eaf-82fd-ced653ff74da";
-    static private final String TEST_ID_UUID = "788950f6-2279-4ae1-820e-d277cea3623c";
     static public final String TEST_IP = "127.0.0.1";
     static public final String TEST_PORT = "57563";
 
     static private final String ID_UUID = "2da9e3d7-0bbe-4099-b343-12783777452f";
     static private final String SEND_IDENTITY = "send_identity";
-    static private final String POLICY_REQUEST = "resolve_policy";
-    static private final String DOMAIN_UUID = "75caaff2-cb4f-4509-b45e-47b447cb35a9";
+    static private final String DOMAIN_UUID = "default";
     static private final String NAME = "vm1";
     static private final String IDENTITY = "192.168.0.1:56732";
     static private final String opflexIdentityRequest =
@@ -76,7 +77,7 @@ public class OpflexConnectionServiceTest {
             "  \"params\": [ {" +
             "      \"name\":    \"" + NAME + "\"," +
             "      \"domain\":  \"" + DOMAIN_UUID + "\"," +
-            "      \"my_role\": [\"" + OpflexConnectionService.Role.POLICY_ELEMENT.toString() + "\"]" +
+            "      \"my_role\": [\"" + Role.POLICY_ELEMENT.toString() + "\"]" +
             "   }] }";
 
     @Mock
@@ -98,9 +99,23 @@ public class OpflexConnectionServiceTest {
     @Mock
     private ReadOnlyTransaction mockRead;
     @Mock
-    private ListenableFuture<Optional<DataObject>> mockOption;
+    private WriteTransaction mockWrite;
+    @Mock
+    private ListenableFuture<Optional<Domains>> mockOption;
+    @Mock
+    ListenableFuture<RpcResult<TransactionStatus>> mockStatus;
+    @Mock
+    private Optional<Domains> mockDao;
+    @Mock
+    private Domains mockDomains;
+    @Mock
+    private Domain mockDomain;
     @Mock
-    private Optional<DataObject> mockDao;
+    private OpflexDomain mockOpflexDomain;
+    @Mock
+    private OpflexRpcServer mockOpflexServer;
+    @Mock
+    private OpflexAgent mockAgent;
 
     @Before
     public void setUp() throws Exception {
@@ -110,10 +125,15 @@ public class OpflexConnectionServiceTest {
          * Mocks
          */
         when(mockDataBroker.newReadOnlyTransaction()).thenReturn(mockRead);
-        when(mockRead.read(LogicalDatastoreType.CONFIGURATION, OpflexConnectionService.
-                DISCOVERY_DEFINITIONS_IID)).thenReturn(mockOption);
+        when(mockDataBroker.newWriteOnlyTransaction()).thenReturn(mockWrite);
+        when(mockWrite.commit()).thenReturn(mockStatus);
+        when(mockRead.read(LogicalDatastoreType.CONFIGURATION,
+                OpflexConnectionService.DOMAINS_IID)).thenReturn(mockOption);
         when(mockOption.get()).thenReturn(mockDao);
-        when(mockDao.get()).thenReturn(dummyDefinitions);
+        when(mockDao.get()).thenReturn(mockDomains);
+        when(mockDomains.getDomain())
+        .thenReturn(new ArrayList<Domain>() {{ add(mockDomain); }});
+        when(mockDomain.getDiscoveryDefinitions()).thenReturn(dummyDefinitions);
 
         /*
          * Builders for creating our own discovery definitions
@@ -129,7 +149,6 @@ public class OpflexConnectionServiceTest {
         System.setProperty(OpflexConnectionService.OPFLEX_LISTENIP, TEST_IP);
     }
 
-
     //@Test
     public void testNoDefinitions() throws Exception {
 
@@ -163,45 +182,50 @@ public class OpflexConnectionServiceTest {
 
     //@Test
     public void testAddConnection() throws Exception {
-        opflexService = new OpflexConnectionService();
-        opflexService.setDataProvider(mockDataBroker);
-
-        when(mockEp.supportsMessages(opflexService.
-                policyRepositoryMessages)).thenReturn(true);
         when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
-
+        when(mockEp.getContext()).thenReturn(mockOpflexServer);
+        when(mockOpflexServer.getDomain()).thenReturn(mockOpflexDomain);
+        when(mockOpflexDomain.getDomain()).thenReturn(DOMAIN_UUID);
 
         opflexService = new OpflexConnectionService();
         opflexService.setDataProvider(mockDataBroker);
         opflexService.addConnection(mockEp);
-        verify(mockEp, Mockito.times(3)).supportsMessages(opflexService.policyRepositoryMessages);
-        verify(mockEp, Mockito.times(3)).getIdentifier();
-        assertTrue(opflexService.opflexAgents.size() == 1);
+        verify(mockEp, Mockito.times(2)).getIdentifier();
+        verify(mockOpflexDomain, Mockito.times(1)).addOpflexAgent((OpflexAgent)anyObject());
     }
 
     //@Test
     public void testChannelClosed() throws Exception {
-        opflexService = new OpflexConnectionService();
-        opflexService.setDataProvider(mockDataBroker);
-
-        JsonRpcEndpoint mockEp = mock(JsonRpcEndpoint.class);
-
-        when(mockEp.supportsMessages(opflexService.
-                policyRepositoryMessages)).thenReturn(true);
         when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
+        when(mockEp.getContext()).thenReturn(mockOpflexServer);
+        when(mockOpflexDomain.getDomain()).thenReturn(DOMAIN_UUID);
+        when(mockAgent.getDomain()).thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
 
 
         opflexService = new OpflexConnectionService();
         opflexService.setDataProvider(mockDataBroker);
+        when(mockOpflexServer.getDomain()).
+            thenReturn(opflexService.opflexDomains.get(OpflexConnectionService.OPFLEX_DOMAIN));
         opflexService.addConnection(mockEp);
-        assertTrue(opflexService.opflexAgents.size() == 1);
+
+        verify(mockEp, Mockito.times(2)).getIdentifier();
+
+        when(mockOpflexServer.getDomain()).thenReturn(mockOpflexDomain);
+        when(mockOpflexDomain.getOpflexAgent(TEST_EP_UUID)).thenReturn(mockAgent);
+        when(mockAgent.getDomain()).thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
+        when(mockAgent.getIdentity()).thenReturn(TEST_EP_UUID);
         opflexService.channelClosed(mockEp);
-        assertTrue(opflexService.opflexAgents.size() == 0);
+        verify(mockAgent).getIdentity();
     }
 
     //@Test
     public void testPublishSubscribeCallback() throws Exception {
 
+        List<Role> testRoles = new ArrayList<Role>();
+        testRoles.add(Role.POLICY_REPOSITORY);
+        testRoles.add(Role.ENDPOINT_REGISTRY);
+        testRoles.add(Role.OBSERVER);
+
         /*
          * This is *far* from UT, but worthwhile for now
          */
@@ -216,11 +240,16 @@ public class OpflexConnectionServiceTest {
         EmbeddedChannel channel = new EmbeddedChannel(decoder, binderHandler);
 
         RpcMessageMap messageMap = new RpcMessageMap();
-        IdentityRequest rpcMsg = new IdentityRequest();
-        messageMap.add(rpcMsg);
+        messageMap.addList(Role.POLICY_REPOSITORY.getMessages());
+
         JsonRpcEndpoint ep = new JsonRpcEndpoint(IDENTITY , opflexService,
                 objectMapper, channel, messageMap, opflexService);
+        ep.setContext(mockOpflexServer);
         binderHandler.setEndpoint(ep);
+
+        when(mockOpflexServer.getRoles()).thenReturn(testRoles);
+        when(mockOpflexServer.getDomain()).
+            thenReturn(opflexService.opflexDomains.get(OpflexConnectionService.OPFLEX_DOMAIN));
         opflexService.addConnection(ep);
         channel.writeInbound(copiedBuffer(opflexIdentityRequest, CharsetUtil.UTF_8));
         Object result = channel.readOutbound();
@@ -228,10 +257,10 @@ public class OpflexConnectionServiceTest {
         IdentityResponse resp = objectMapper.readValue(result.toString(), IdentityResponse.class);
         assertTrue(result != null);
         assertTrue(resp.getResult().getMy_role()
-                .contains(OpflexConnectionService.Role.ENDPOINT_REGISTRY.toString()));
+                .contains(Role.ENDPOINT_REGISTRY.toString()));
         assertTrue(resp.getResult().getMy_role()
-                .contains(OpflexConnectionService.Role.POLICY_REPOSITORY.toString()));
+                .contains(Role.POLICY_REPOSITORY.toString()));
         assertTrue(resp.getResult().getMy_role()
-                .contains(OpflexConnectionService.Role.OBSERVER.toString()));
+                .contains(Role.OBSERVER.toString()));
     }
 }
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomainTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexDomainTest.java
new file mode 100644 (file)
index 0000000..3f9b9de
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.Channel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class OpflexDomainTest {
+    protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+
+    private static final String TEST_DOMAIN = "default";
+    private static final String TEST_ID = "localhost:6671";
+    private OpflexDomain testDomain;
+    private List<Role> dummyRoles = null;
+
+    @Mock
+    private OpflexRpcServer mockServer;
+    @Mock
+    private Channel mockChannel;
+    @Mock
+    private RpcServer mockRpcServer;
+
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        dummyRoles = new ArrayList<Role>();
+        dummyRoles.add(Role.POLICY_REPOSITORY);
+
+        testDomain = new OpflexDomain();
+        testDomain.setDomain(TEST_DOMAIN);
+        when(mockServer.getRpcServer()).thenReturn(mockRpcServer);
+        when(mockRpcServer.getChannel()).thenReturn(mockChannel);
+        when(mockServer.getId()).thenReturn(TEST_ID);
+        when(mockServer.sameServer((OpflexRpcServer)anyObject())).thenReturn(false);
+        when(mockServer.getRoles()).thenReturn(dummyRoles);
+    }
+
+    @Test
+    public void testAddServers() throws Exception {
+        List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+        servers.add(mockServer);
+        testDomain.addServers(servers);
+        verify(mockServer).start();
+    }
+
+    @Test
+    public void testDropServers() throws Exception {
+        List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+        servers.add(mockServer);
+        testDomain.addServers(servers);
+
+        List<String> dropList = new ArrayList<String>();
+        dropList.add(TEST_ID);
+        testDomain.dropServers(dropList);
+        verify(mockServer).getRpcServer();
+        verify(mockRpcServer).getChannel();
+        verify(mockChannel).disconnect();
+    }
+
+    @Test
+    public void testAddDuplicateServer() throws Exception {
+        List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+        servers.add(mockServer);
+        testDomain.addServers(servers);
+        testDomain.addServers(servers);
+        verify(mockServer).getRpcServer();
+        verify(mockRpcServer).getChannel();
+        verify(mockChannel).disconnect();
+    }
+
+    @Test
+    public void testUpdateServers() throws Exception {
+        List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+        servers.add(mockServer);
+        testDomain.addServers(servers);
+        testDomain.addServers(servers);
+        verify(mockServer).getRpcServer();
+        verify(mockRpcServer).getChannel();
+        verify(mockChannel).disconnect();
+
+    }
+}
index 9edf0d96679480fe9948bfc301a39be488da8a9f..25a4c7dd5654d91548c2b4151f0eafafca5da59a 100644 (file)
@@ -15,6 +15,22 @@ import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointPolicyUpdateRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointPolicyUpdateResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyResolutionRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyResolutionResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyTriggerRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyTriggerResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyUpdateRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyUpdateResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.StateReportRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.StateReportResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServerTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/OpflexRpcServerTest.java
new file mode 100644 (file)
index 0000000..6eb07a8
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ *
+ */
+public class OpflexRpcServerTest implements ConnectionService {
+    protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+    private static final String TEST_IDENTITY = "localhost:6671";
+    private static final String TEST_IDENTITY2 = "localhost:6672";
+    private static final String TEST_DOMAIN = "default";
+
+    private OpflexRpcServer testServer = null;
+    private OpflexRpcServer ts1 = null;
+    private OpflexRpcServer ts2 = null;
+    private OpflexRpcServer ts3 = null;
+    private List<Role> roles = null;
+
+    @Mock
+    private OpflexDomain mockDomain;
+    @Mock
+    private RpcServer mockServer;
+    @Mock
+    private OpflexConnectionService mockService;
+
+    @Override
+    public void addConnection(JsonRpcEndpoint endpoint) {
+    }
+
+    @Override
+    public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        roles = new ArrayList<Role>();
+        roles.add(Role.POLICY_REPOSITORY);
+
+        testServer =
+                new OpflexRpcServer(mockDomain, TEST_IDENTITY, roles);
+        testServer.setRpcBroker(mockService);
+        testServer.setConnectionService(mockService);
+
+        ts1 = new OpflexRpcServer(mockDomain, TEST_IDENTITY, roles);
+        ts2 = new OpflexRpcServer(mockDomain, TEST_IDENTITY2, roles);
+        roles = new ArrayList<Role>();
+        roles.add(Role.POLICY_ELEMENT);
+        ts3 = new OpflexRpcServer(mockDomain, TEST_IDENTITY2, roles);
+        when(mockDomain.getDomain()).thenReturn(TEST_DOMAIN);
+    }
+
+
+    @Test
+    public void testStart() throws Exception {
+        testServer.start();
+        assertTrue(testServer.getRpcServer() != null);
+    }
+
+    @Test
+    public void testSameServer() throws Exception {
+        assertTrue(testServer.sameServer(ts1));
+        assertFalse(testServer.sameServer(ts2));
+        assertFalse(testServer.sameServer(ts3));
+    }
+
+}
diff --git a/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/RoleTest.java b/groupbasedpolicy/src/test/java/org/opendaylight/groupbasedpolicy/renderer/opflex/RoleTest.java
new file mode 100644 (file)
index 0000000..39f8c7c
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ *  Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class RoleTest {
+    protected static final Logger logger = LoggerFactory.getLogger(RoleTest.class);
+
+    private boolean idReq;
+    private boolean idRsp;
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @Test
+    public void testPolicyRepository() throws Exception {
+        idReq = false;
+        idRsp = false;
+
+        List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
+        for (RpcMessage msg : messages) {
+            if (msg instanceof IdentityRequest) {
+                idReq = true;
+            }
+            if (msg instanceof IdentityResponse) {
+                idRsp = true;
+            }
+        }
+        assertTrue(idReq == true);
+        assertTrue(idRsp == true);
+    }
+
+    @Test
+    public void testEndpointRegistry() throws Exception {
+        idReq = false;
+        idRsp = false;
+
+        List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
+        for (RpcMessage msg : messages) {
+            if (msg instanceof IdentityRequest) {
+                idReq = true;
+            }
+            if (msg instanceof IdentityResponse) {
+                idRsp = true;
+            }
+        }
+        assertTrue(idReq == true);
+        assertTrue(idRsp == true);
+    }
+
+    @Test
+    public void testObserver() throws Exception {
+        idReq = false;
+        idRsp = false;
+
+        List<RpcMessage> messages = Role.OBSERVER.getMessages();
+        for (RpcMessage msg : messages) {
+            if (msg instanceof IdentityRequest) {
+                idReq = true;
+            }
+            if (msg instanceof IdentityResponse) {
+                idRsp = true;
+            }
+        }
+        assertTrue(idReq == true);
+        assertTrue(idRsp == true);
+    }
+
+}
\ No newline at end of file