final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration =
dataBrokerService
.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
- OpflexConnectionService.DISCOVERY_DEFINITIONS_IID,
+ OpflexConnectionService.DOMAINS_IID,
connectionService, DataChangeScope.SUBTREE );
final class AutoCloseableConnectionService implements AutoCloseable {
*
* Generated from: yang module name: opflex-provider-impl yang module local name: opflex-provider-impl
* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Mon Jul 07 21:34:41 UTC 2014
+* Generated at: Wed Jul 16 08:30:30 UTC 2014
*
* Do not modify this file unless it is present under src/main directory
*/
package org.opendaylight.groupbasedpolicy.jsonrpc;
-/*
+/**
* An interface to provide notifications when connections are
* established or closed. The connection notifications
* use{@link RpcEncpoint} objects; as connections come and go,
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
}
private String identifier;
+ private Object context;
private ObjectMapper objectMapper;
private Channel nettyChannel;
private Map<String, CallContext> methodContext = Maps.newHashMap();
this.identifier = identifier;
}
+ public Object getContext() {
+ return context;
+ }
+
+ public void setContext(Object context) {
+ this.context = context;
+ }
+
public ConnectionService getConnectionService() {
return connectionService;
}
return nettyChannel;
}
- public boolean supportsMessages(List<RpcMessage> messages) {
- return messageMap.containsMessages(messages);
- }
-
public JsonRpcEndpoint(String identifier, ConnectionService connectionService,
ObjectMapper objectMapper, Channel channel,
RpcMessageMap messageMap, RpcBroker broker) {
String identity;
int listenPort;
Channel channel;
+ Object context;
RpcMessageMap messageMap;
ConnectionService connectionService;
RpcBroker broker;
this.identity = identity;
}
+ public Object getContext() {
+ return context;
+ }
+
+ public void setContext(Object context) {
+ this.context = context;
+ }
+
public void addMessage(RpcMessage message) {
this.messageMap.add(message);
}
JsonRpcEndpoint endpoint = new JsonRpcEndpoint(identifier, connectionService,
objectMapper, channel, messageMap, broker);
+ endpoint.setContext(context);
JsonRpcServiceBinderHandler binderHandler =
new JsonRpcServiceBinderHandler(endpoint);
channel.pipeline().addLast(binderHandler);
* @author readams
*/
public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
- private static final Logger LOG =
+ private static final Logger LOG =
LoggerFactory.getLogger(OFOverlayRenderer.class);
private final DataBroker dataBroker;
private final SwitchManager switchManager;
private final EndpointManager endpointManager;
private final PolicyManager policyManager;
-
+
private final ScheduledExecutorService executor;
- private static final InstanceIdentifier<OfOverlayConfig> configIid =
+ private static final InstanceIdentifier<OfOverlayConfig> configIid =
InstanceIdentifier.builder(OfOverlayConfig.class).build();
-
+
private OfOverlayConfig config;
ListenerRegistration<DataChangeListener> configReg;
int numCPU = Runtime.getRuntime().availableProcessors();
executor = Executors.newScheduledThreadPool(numCPU * 2);
-
+
policyResolver = new PolicyResolver(dataProvider, executor);
switchManager = new SwitchManager(dataProvider, executor);
- endpointManager = new EndpointManager(dataProvider, rpcRegistry,
+ endpointManager = new EndpointManager(dataProvider, rpcRegistry,
executor, switchManager);
-
+
policyManager = new PolicyManager(dataProvider,
- policyResolver,
+ policyResolver,
switchManager,
endpointManager,
rpcRegistry,
executor);
-
- configReg =
- dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
- configIid,
- this,
+
+ configReg =
+ dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+ configIid,
+ this,
DataChangeScope.SUBTREE);
readConfig();
LOG.info("Initialized OFOverlay renderer");
if (switchManager != null) switchManager.close();
if (endpointManager != null) endpointManager.close();
}
-
+
// ******************
// DataChangeListener
// ******************
-
+
@Override
- public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
DataObject> change) {
readConfig();
}
// **************
// Implementation
// **************
-
+
private void readConfig() {
- ListenableFuture<Optional<DataObject>> dao =
+ ListenableFuture<Optional<OfOverlayConfig>> dao =
dataBroker.newReadOnlyTransaction()
.read(LogicalDatastoreType.CONFIGURATION, configIid);
- Futures.addCallback(dao, new FutureCallback<Optional<DataObject>>() {
+ Futures.addCallback(dao, new FutureCallback<Optional<OfOverlayConfig>>() {
@Override
- public void onSuccess(final Optional<DataObject> result) {
+ public void onSuccess(final Optional<OfOverlayConfig> result) {
if (!result.isPresent()) return;
if (result.get() instanceof OfOverlayConfig) {
config = (OfOverlayConfig)result.get();
}
}, executor);
}
-
+
private void applyConfig() {
switchManager.setEncapsulationFormat(config.getEncapsulationFormat());
endpointManager.setLearningMode(config.getLearningMode());
import com.google.common.util.concurrent.ListenableFuture;
/**
- * Manage connected switches and ensure their configuration is set up
+ * Manage connected switches and ensure their configuration is set up
* correctly
* @author readams
*/
public class SwitchManager implements AutoCloseable, DataChangeListener {
- private static final Logger LOG =
+ private static final Logger LOG =
LoggerFactory.getLogger(SwitchManager.class);
private final DataBroker dataProvider;
.child(Node.class).build();
private ListenerRegistration<DataChangeListener> nodesReg;
- private ConcurrentHashMap<NodeId, SwitchState> switches =
+ private ConcurrentHashMap<NodeId, SwitchState> switches =
new ConcurrentHashMap<>();
private List<SwitchListener> listeners = new CopyOnWriteArrayList<>();
super();
this.dataProvider = dataProvider;
nodesReg = dataProvider
- .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- nodeIid, this,
+ .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ nodeIid, this,
DataChangeScope.ONE);
readSwitches();
LOG.debug("Initialized OFOverlay switch manager");
// *************
// SwitchManager
// *************
-
+
/**
* Get the collection of switches that are in the "ready" state. Note
* that the collection may be concurrently modified
* @return A {@link Collection} containing the switches that are ready.
*/
public Collection<NodeId> getReadySwitches() {
- Collection<SwitchState> ready =
- Collections2.filter(switches.values(),
+ Collection<SwitchState> ready =
+ Collections2.filter(switches.values(),
new Predicate<SwitchState>() {
@Override
public boolean apply(SwitchState input) {
- return SwitchStatus.READY.equals(input.status);
+ return SwitchStatus.READY.equals(input.status);
}
});
- return Collections2.transform(ready,
+ return Collections2.transform(ready,
new Function<SwitchState, NodeId>() {
@Override
public NodeId apply(SwitchState input) {
if (state == null) return false;
return SwitchStatus.READY.equals(state.status);
}
-
+
/**
* Add a {@link SwitchListener} to get notifications of switch events
* @param listener the {@link SwitchListener} to add
// ******************
@Override
- public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>,
DataObject> change) {
for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
LOG.info("{} removed", iid);
updateSwitch(dao);
}
}
-
+
// **************
// Implementation
// **************
-
+
private void updateSwitch(DataObject dao) {
if (!(dao instanceof Node)) return;
// Switches are registered as Nodes in the inventory; OpenFlow switches
if (fcn == null) return;
LOG.debug("{} update", node.getId());
-
- SwitchState state = switches.get(node.getId());
+
+ SwitchState state = switches.get(node.getId());
if (state == null) {
state = new SwitchState(node);
- SwitchState old =
+ SwitchState old =
switches.putIfAbsent(node.getId(), state);
if (old == null) {
switchConnected(node.getId());
}
}
}
-
- // XXX there's a race condition here if a switch exists at startup and is
+
+ // XXX there's a race condition here if a switch exists at startup and is
// removed very quickly.
- private final FutureCallback<Optional<DataObject>> readSwitchesCallback =
- new FutureCallback<Optional<DataObject>>() {
+ private final FutureCallback<Optional<Nodes>> readSwitchesCallback =
+ new FutureCallback<Optional<Nodes>>() {
@Override
- public void onSuccess(Optional<DataObject> result) {
+ public void onSuccess(Optional<Nodes> result) {
if (result.isPresent() && result.get() instanceof Nodes) {
Nodes nodes = (Nodes)result.get();
for (Node node : nodes.getNode()) {
LOG.error("Count not read switch information", t);
}
};
-
+
/**
* Read the set of switches from the ODL inventory and update our internal
* map.
- *
- * <p>This is safe only if there can only be one notification at a time,
- * as there are race conditions in the face of concurrent data change
+ *
+ * <p>This is safe only if there can only be one notification at a time,
+ * as there are race conditions in the face of concurrent data change
* notifications
*/
private void readSwitches() {
- ListenableFuture<Optional<DataObject>> future =
+ ListenableFuture<Optional<Nodes>> future =
dataProvider.newReadOnlyTransaction()
.read(LogicalDatastoreType.OPERATIONAL,nodesIid);
Futures.addCallback(future, readSwitchesCallback);
}
-
+
/**
* Set the ready state of the node to PREPARING and begin the initialization
* process
LOG.info("New switch {} connected", nodeId);
}
}
-
+
/**
* Set the ready state of the node to READY and notify listeners
*/
}
LOG.info("Switch {} removed", nodeId);
}
-
+
private enum SwitchStatus {
/**
* The switch is connected but not yet configured
*/
READY
}
-
+
/**
* Internal representation of the state of a connected switch
*/
super();
this.switchNode = switchNode;
}
-
+
}
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
* @author readams
*/
public abstract class FlowTable {
- protected static final Logger LOG =
+ protected static final Logger LOG =
LoggerFactory.getLogger(FlowTable.class);
/**
public static class FlowTableCtx {
protected final DataBroker dataBroker;
protected final RpcProviderRegistry rpcRegistry;
-
+
protected final PolicyManager policyManager;
protected final SwitchManager switchManager;
protected final EndpointManager endpointManager;
-
+
protected final PolicyResolver policyResolver;
-
+
protected final ScheduledExecutorService executor;
public FlowTableCtx(DataBroker dataBroker,
this.policyResolver = policyResolver;
this.executor = executor;
}
-
+
}
-
+
protected final FlowTableCtx ctx;
public FlowTable(FlowTableCtx ctx) {
// *********
// FlowTable
// *********
-
+
/**
- * Update the relevant flow table for the node
+ * Update the relevant flow table for the node
* @param nodeId the node to update
* @param dirty the dirty set
- * @throws Exception
+ * @throws Exception
*/
public void update(NodeId nodeId, Dirty dirty) throws Exception {
ReadWriteTransaction t = ctx.dataBroker.newReadWriteTransaction();
- InstanceIdentifier<Table> tiid =
+ InstanceIdentifier<Table> tiid =
FlowUtils.createTablePath(nodeId, getTableId());
- Optional<DataObject> r =
+ Optional<Table> r =
t.read(LogicalDatastoreType.CONFIGURATION, tiid).get();
HashMap<String, FlowCtx> flowMap = new HashMap<>();
FlowUtils.createFlowPath(tiid, fx.f.getKey()));
}
}
-
+
ListenableFuture<RpcResult<TransactionStatus>> result = t.commit();
Futures.addCallback(result, updateCallback);
}
/**
* Sync flow state using the flow map
- * @throws Exception
+ * @throws Exception
*/
public abstract void sync(ReadWriteTransaction t,
InstanceIdentifier<Table> tiid,
Map<String, FlowCtx> flowMap,
NodeId nodeId, Dirty dirty) throws Exception;
-
+
/**
* Get the table ID being manipulated
*/
public abstract short getTableId();
-
+
// ***************
// Utility methods
// ***************
.setBarrier(false)
.setHardTimeout(0)
.setIdleTimeout(0);
- }
-
+ }
+
/**
* Generic callback for handling result of flow manipulation
* @author readams
@Override
public void onFailure(Throwable t) {
- LOG.error("Failed to add flow entry", t);
+ LOG.error("Failed to add flow entry", t);
}
}
protected static final FlowCallback<TransactionStatus> updateCallback =
* @param flowId the ID for the flow
* @return <code>true</code> if the flow needs to be added
*/
- protected static boolean visit(Map<String, FlowCtx> flowMap,
+ protected static boolean visit(Map<String, FlowCtx> flowMap,
String flowId) {
FlowCtx c = flowMap.get(flowId);
if (c != null) {
}
return true;
}
-
+
/**
* Write the given flow to the transaction
*/
InstanceIdentifier<Table> tiid,
Flow flow) {
LOG.trace("{} {}", flow.getId(), flow);
- t.put(LogicalDatastoreType.CONFIGURATION,
- FlowUtils.createFlowPath(tiid, flow.getId()),
+ t.put(LogicalDatastoreType.CONFIGURATION,
+ FlowUtils.createFlowPath(tiid, flow.getId()),
flow);
}
-
+
/**
* Context object for keeping track of flow state
*/
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+
+public class OpflexAgent {
+ String identity;
+ String domain;
+ List<Role> roles;
+ JsonRpcEndpoint endpoint;
+ OpflexRpcServer opflexServer;
+
+ public OpflexAgent() {
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public String getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(String identity) {
+ this.identity = identity;
+ }
+
+ public OpflexRpcServer getOpflexServer() {
+ return opflexServer;
+ }
+
+ public void setOpflexServer(OpflexRpcServer server) {
+ this.opflexServer = server;
+ }
+
+ public List<Role> getRoles() {
+ return roles;
+ }
+
+ public void setRoles(List<Role> roles) {
+ this.roles = roles;
+ }
+
+ public JsonRpcEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(JsonRpcEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+}
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.Domains;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DomainsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.Domain;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.DomainKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepository;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-/*
+/**
* Manages the different OpFlex entity connections. It does this
* on behalf of each logical OpFlex entity:
* o Policy Repositories
* Servers and connections are maintained in dedicated client and
* server maps.
*
- * TODO: calls to add messages to policy repository, EP registry , and observer
- * TODO: incorporate OpFlex domain
- * TODO: break into smaller pieces?
+ * @author tbachman
+ *
+ * TODO: Still too big - need to separate
*/
public class OpflexConnectionService
implements ConnectionService, RpcBroker,
protected static final Logger logger =
LoggerFactory.getLogger(OpflexConnectionService.class);
- public enum Role {
- POLICY_REPOSITORY("policy_repository"),
- ENDPOINT_REGISTRY("endpoint_registry"),
- OBSERVER("observer"),
- POLICY_ELEMENT("policy_element");
-
- private String role;
- Role(String role) {
- this.role = role;
- }
- @Override
- public String toString() {
- return this.role;
- }
- }
-
- private static class OpflexConnection {
- String identity;
- List<Role> roles;
- JsonRpcEndpoint endpoint;
-
- public OpflexConnection() {
- }
-
- public String getIdentity() {
- return identity;
- }
-
- public void setIdentity(String identity) {
- this.identity = identity;
- }
-
- public List<Role> getRoles() {
- return roles;
- }
-
- public void setRoles(List<Role> roles) {
- this.roles = roles;
- }
-
- public JsonRpcEndpoint getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(JsonRpcEndpoint endpoint) {
- this.endpoint = endpoint;
- }
-
- }
-
- public static class OpflexRpcServer {
- private String identity;
- private List<Role> roles;
- private RpcServer server;
-
- public OpflexRpcServer() {
- roles = new ArrayList<Role>();
- }
-
- public OpflexRpcServer(String identity) {
- this.identity = identity;
- }
-
- public OpflexRpcServer(String identity, List<Role> roles) {
- this.identity = identity;
- this.roles = roles;
- }
-
- public String getId() {
- return this.identity;
- }
-
- public void setRpcServer(RpcServer server) {
- this.server = server;
- }
-
- public RpcServer getRpcServer() {
- return this.server;
- }
-
- public void addRole(Role role) {
- if (!this.roles.contains(role))
- this.roles.add(role);
- }
-
- public List<Role> getRoles() {
- return this.roles;
- }
-
- public boolean sameServer(OpflexRpcServer srv) {
- if (this == srv)
- return true;
- if (srv == null)
- return false;
- if (!this.identity.equals(srv.identity))
- return false;
- if (this.roles == null && srv.roles == null)
- return true;
- if (this.roles == null || srv.roles == null)
- return false;
- if (this.roles.size() == srv.roles.size() && this.roles.containsAll(srv.roles))
- return true;
- return false;
- }
- }
+ static final String OPFLEX_DOMAIN = "default";
+ static final String INVALID_DOMAIN = "Domain mismatch";
// Properties that can be set in config.ini
static final String OPFLEX_LISTENPORT = "opflex.listenPort";
private static final Integer defaultOpflexPort = 6670;
private Integer opflexListenPort = defaultOpflexPort;
private String opflexListenIp = defaultOpflexIp;
- ConcurrentMap<String, OpflexConnection> opflexAgents = null;
- ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
+ private final ScheduledExecutorService executor;
+
+ List<Domain> domainList = null;
+ ConcurrentMap<String, OpflexDomain> opflexDomains = null;
ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
- List<RpcMessage> policyRepositoryMessages;
- List<RpcMessage> endpointRegistryMessages;
- List<RpcMessage> observerMessages;
+
private DataBroker dataProvider;
- public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_DEFINITIONS_IID =
- InstanceIdentifier.builder(DiscoveryDefinitions.class).build();
+ public static final InstanceIdentifier<Domains> DOMAINS_IID =
+ InstanceIdentifier.builder(Domains.class).build();
+ public InstanceIdentifier<Domain> domainIid(DomainKey domainKey) {
+ return InstanceIdentifier.builder(Domains.class).child(Domain.class, domainKey)
+ .build();
+ }
+
+ public OpflexConnectionService() {
+ int numCPU = Runtime.getRuntime().availableProcessors();
+ executor = Executors.newScheduledThreadPool(numCPU * 2);
+ }
/**
*
public void setDataProvider(DataBroker salDataProvider) {
dataProvider = salDataProvider;
- startOpflexManager();
- }
-
- private DiscoveryDefinitions getDiscoveryDefinitions() {
-
- ReadTransaction t = dataProvider.newReadOnlyTransaction();
- ListenableFuture<Optional<DataObject>> f = t.read(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
- try {
- Optional<DataObject> dao = f.get();
- if (dao.get() != null && dao.get() instanceof DiscoveryDefinitions) {
- return (DiscoveryDefinitions)dao.get();
- }
- }
- catch ( Exception e ) {
- logger.warn("Not sure what happens here");
- }
- return null;
-
+ start();
}
- private List<OpflexRpcServer> setDefaultIdentities() {
+ private List<OpflexRpcServer> setDefaultIdentities(OpflexDomain domain) {
/*
* Create a single server, filling all roles
roles.add(Role.POLICY_REPOSITORY);
roles.add(Role.ENDPOINT_REGISTRY);
roles.add(Role.OBSERVER);
- OpflexRpcServer srv = new OpflexRpcServer(identity, roles);
+
+ OpflexDomain od = new OpflexDomain();
+ od.setDomain(OPFLEX_DOMAIN);
+ OpflexRpcServer srv = new OpflexRpcServer(od, identity, roles);
+ srv.setConnectionService(this);
+ srv.setRpcBroker(this);
srvList.add(srv);
return srvList;
}
- private List<OpflexRpcServer> createServerList() {
- DiscoveryDefinitions identities = getDiscoveryDefinitions();
+ private List<OpflexRpcServer> createServerList(OpflexDomain d, Domain domain) {
+
+ DiscoveryDefinitions identities = domain.getDiscoveryDefinitions();
if (identities != null) {
Map<String, OpflexRpcServer> servers =
new ConcurrentHashMap<String, OpflexRpcServer>();
List<String> addList = getPolicyRepositories(identities.getPolicyRepository());
- addServerList(servers, addList, Role.POLICY_REPOSITORY);
+ addServerList(d, servers, addList, Role.POLICY_REPOSITORY);
addList = getEndpointRegistries(identities.getEndpointRegistry());
- addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
+ addServerList(d, servers, addList, Role.ENDPOINT_REGISTRY);
addList = getObservers(identities.getObserver());
- addServerList(servers, addList, Role.OBSERVER);
+ addServerList(d, servers, addList, Role.OBSERVER);
return(new ArrayList<OpflexRpcServer>(servers.values()));
}
- else {
- return setDefaultIdentities();
- }
+ return null;
+ }
+
+ private void initializeConfig() {
+ // XXX - This is a hack to avoid a bug in the data broker
+ // API where you have to write all the parents before you can write
+ // a child
+ WriteTransaction t = dataProvider.newWriteOnlyTransaction();
+ t.put(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID, new DomainsBuilder().build());
+ ListenableFuture<RpcResult<TransactionStatus>> f = t.commit();
+ Futures.addCallback(f, new FutureCallback<RpcResult<TransactionStatus>>() {
+
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.error("Could not write domain base container", t);
+ }
+ });
}
private void initializeServers() {
+ OpflexDomain od;
+
+ //initializeConfig();
+ readConfig();
/*
* Get the configured identities, if any. If lists are empty,
* set up a single instance of each, using the localhost
- * interface
+ * interface, all inside a default domain
*/
- List<OpflexRpcServer> serverList = createServerList();
- addServers(serverList);
+ if (domainList != null && domainList.size() > 0) {
+ for (Domain d : domainList) {
+ od = opflexDomains.get(d.getId());
+ if (od == null) continue;
+ List<OpflexRpcServer> serverList = createServerList(od, d);
+ od.addServers(serverList);
+ }
+ }
+ else {
+ // TODO: should also write into config store?
+ logger.warn("Setting default identities");
+ od = new OpflexDomain();
+ od.setDomain(OPFLEX_DOMAIN);
+ od.addServers(setDefaultIdentities(od));
+ opflexDomains.put(od.getDomain(), od);
+ }
}
return identityList;
}
- private void addServerList( Map<String, OpflexRpcServer> servers,
+ private void addServerList( OpflexDomain d, Map<String, OpflexRpcServer> servers,
List<String> idList, Role role ) {
if (idList == null || idList.size() <= 0)
return;
}
roles.add(role);
- srv = new OpflexRpcServer(id, roles);
+ srv = new OpflexRpcServer(d, id, roles);
+ srv.setConnectionService(this);
+ srv.setRpcBroker(this);
servers.put(id, srv);
}
}
- private void launchRpcServer(OpflexRpcServer srv) {
- RpcServer rpcSrv = new RpcServer(srv.getId().split(":")[0],
- Integer.parseInt(srv.getId().split(":")[1]));
- rpcSrv.setConnectionService(this);
- rpcSrv.setRpcBroker(this);
- /*
- * Make sure the server is configured for the proper messages
- */
- List<Role> roles = srv.getRoles();
- for ( Role role : roles ) {
- switch (role) {
- case POLICY_REPOSITORY:
- {
- rpcSrv.addMessageList(this.policyRepositoryMessages);
- }
- break;
- case ENDPOINT_REGISTRY:
- {
- rpcSrv.addMessageList(this.endpointRegistryMessages);
- }
- break;
- case OBSERVER:
- {
- rpcSrv.addMessageList(this.observerMessages);
- }
- break;
- default:
- {
- logger.warn("Invalid Role {}", role );
- }
- break;
- }
- }
- srv.setRpcServer(rpcSrv);
- opflexServers.put(srv.getId(), srv);
+ /**
+ * We store the {@link OpflexDomain} in the {@link JsonRpcEndpoint}'s
+ * context field when the {@link RpcServer} creates the new connection.
+ *
+ * @param endpoint The endpoint to look up
+ * @return The OpflexDomain that owns this endpoint
+ *
+ * TODO: should throw an exception of there is no
+ * OpflexDomain that contains this endpoint
+ */
+ public OpflexDomain getOpflexDomain(JsonRpcEndpoint endpoint) {
+ if (endpoint.getContext() instanceof OpflexRpcServer) {
+ OpflexRpcServer srv = (OpflexRpcServer)endpoint.getContext();
+ return srv.getDomain();
+ }
+ logger.warn("endpoint {} does not have a domain", endpoint.getIdentifier());
+ return null;
+ }
- new Thread() {
- private RpcServer server;
+ /**
+ * Find the {@link OpflexAgent} that owns this
+ * {@link JsonRpcEndpoint}.
+ *
+ * @param endpoint The endpoint to look up
+ * @return The OpflexConnection that owns this endpoint
+ *
+ * TODO: should throw an exception of there is no
+ * OpflexConnection that contains this endpoint
+ */
+ public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
- public Thread initializeServerParams(RpcServer server) {
- this.server = server;
- return this;
- }
- @Override
- public void run() {
- try {
- server.start();
- } catch (Exception e) {
- logger.warn("Exception starting new server {}", e);
- }
- }
- }.initializeServerParams(rpcSrv).start();
+ OpflexDomain od = getOpflexDomain(endpoint);
+ if (od != null) {
+ return od.getOpflexAgent(endpoint.getIdentifier());
+ }
+ logger.warn("Couldn't find OpflexConnection for {}", endpoint.getIdentifier());
+ return null;
}
- private void addServers(List<OpflexRpcServer> idMap) {
- /*
- * Check to see if there's already a server
- * with this identity, and if so, close it
- * and replace it with this one.
- */
- for ( OpflexRpcServer srv: idMap ) {
- OpflexRpcServer server = opflexServers.get(srv.getId());
- if (server != null) {
- if ( !server.sameServer(srv)) {
- OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
- oldServer.getRpcServer().getChannel().disconnect();
- launchRpcServer(srv);
- }
- }
- else {
- launchRpcServer(srv);
- }
+ /**
+ * Get the OpflexRpcServer that spawned this endpoint.
+ *
+ * @param endpoint The endpoint to look up
+ * @return The OpflexRpcServer that owns this endpoint, or
+ * null if the server no longer exists
+ *
+ * TODO: exception if the endpoint is owned by anything
+ */
+ public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
+ if (endpoint.getContext() instanceof OpflexRpcServer) {
+ return (OpflexRpcServer)endpoint.getContext();
}
+ logger.warn("Couldn't find OpflexConnection for endpoint {}",
+ endpoint.getIdentifier());
+ return null;
}
- private void dropServers(List<String> oldServers) {
- OpflexRpcServer server;
- /*
- * Check to see if there's already a server
- * with this identity, and if so, close it
- * and replace it with this one.
- */
- for (String identity: oldServers) {
- if (opflexServers.containsKey(identity)) {
- server = opflexServers.remove(identity);
- server.getRpcServer().getChannel().disconnect();
- }
- }
- }
-
- public void startOpflexManager() {
- opflexAgents = new ConcurrentHashMap<String, OpflexConnection>();
- opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
+ public void start() {
+ opflexDomains = new ConcurrentHashMap<String, OpflexDomain>();
brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
/*
* Set up the messages supported by each OpFlex policy
* component
*/
- policyRepositoryMessages = new ArrayList<RpcMessage>();
- endpointRegistryMessages = new ArrayList<RpcMessage>();
- observerMessages = new ArrayList<RpcMessage>();
-
- IdentityRequest idRequest = new IdentityRequest();
- policyRepositoryMessages.add(idRequest);
- endpointRegistryMessages.add(idRequest);
- observerMessages.add(idRequest);
-
/* this class implements identity handlers */
- subscribe(idRequest, this);
-
- IdentityResponse idResponse = new IdentityResponse();
- policyRepositoryMessages.add(idResponse);
- endpointRegistryMessages.add(idResponse);
- observerMessages.add(idResponse);
+ subscribe(new IdentityRequest(), this);
initializeServers();
}
* connections and servers.
*/
public void stopping() {
- for (OpflexConnection connection : opflexAgents.values()) {
- connection.getEndpoint().getChannel().disconnect();
- }
- for (OpflexRpcServer server : opflexServers.values() ) {
- if (server.getRpcServer().getChannel() != null) {
- server.getRpcServer().getChannel().disconnect();
- }
+ for (OpflexDomain d : opflexDomains.values()) {
+ d.cleanup();
}
}
- /**
- * Remove the OpFlex connection/agent from the map
- *
- * @param identifier The identity of the connection that was closed
- */
- public void removeConnection(String identifier) {
- opflexAgents.remove(identifier);
- }
- /**
- * Add a server with the given identity
- *
- * @param identity The IP address/socket pair for the server
- * @param server The instantiated server
- */
- public void addServer(String identity, OpflexRpcServer server) {
- opflexServers.put(identity, server);
+ private void deleteDomain(String domain) {
+
+ OpflexDomain od = opflexDomains.remove(domain);
+ if (od != null) {
+ od.cleanup();
+ }
}
/**
- * Implemented from the AutoCloseable interface.
+ * Close the connection service. Implemented from the
+ * AutoCloseable interface.
*/
@Override
public void close() throws ExecutionException, InterruptedException {
+ executor.shutdownNow();
+
if (dataProvider != null) {
WriteTransaction t = dataProvider.newWriteOnlyTransaction();
- t.delete(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
+ t.delete(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID);
t.commit().get();
}
}
- @Override
- public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject>change) {
-
- List<String> addList = new ArrayList<String>();
- List <String> dropList = new ArrayList<String>();
-
- /* Get the new list of configured servers */
- List<OpflexRpcServer> serverList = createServerList();
+ private void readConfig() {
+ ListenableFuture<Optional<Domains>> dao =
+ dataProvider.newReadOnlyTransaction()
+ .read(LogicalDatastoreType.CONFIGURATION, DOMAINS_IID);
+ Futures.addCallback(dao, new FutureCallback<Optional<Domains>>() {
+ @Override
+ public void onSuccess(final Optional<Domains> result) {
+ if (!result.isPresent()) {
+ logger.warn("No result!!!");
+ return;
+ }
+ getNewConfig(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.error("Failed to read configuration", t);
+ }
+ }, executor);
+ }
- /*
- * Create a list of new servers by skipping any servers in the
- * list that are already configured (i.e. same IP/socket and set
- * of roles) -- no need to take them down
- */
- for ( OpflexRpcServer srv : serverList ) {
- OpflexRpcServer s = opflexServers.get(srv.getId());
- if (s != null && s.getRoles().containsAll(srv.getRoles())) {
- continue;
- }
- addList.add(srv.getId());
- }
+ private void getNewConfig(final Optional<Domains> result) {
+
+ List<String> currentDomains = new ArrayList<String>(opflexDomains.keySet());
+ List<String> newDomains = new ArrayList<String>();
+ List<String> addList = new ArrayList<String>();
+ List <String> dropList = new ArrayList<String>();
+ List <String> updateList = new ArrayList<String>();
+
+ if (result.get() instanceof Domains) {
+
+ /*
+ * Get the new list of domains from the
+ * configuration store, and convert to a
+ * list of the actual domain names for list
+ * manipulation
+ */
+ Domains domains = (Domains)result.get();
+
+ domainList = domains.getDomain();
+ for (Domain domainObj : domainList) {
+ newDomains.add(domainObj.getId());
+ }
+
+ logger.warn("Current domains {}", currentDomains);
+ /*
+ * Find out what's changed at the domain level.
+ * Classify as additions, deletions, and updates
+ */
+ addList = new ArrayList<String>(newDomains);
+ dropList = new ArrayList<String>(currentDomains);
+ updateList = new ArrayList<String>(newDomains);
+ addList.removeAll(currentDomains);
+ dropList.removeAll(newDomains);
+ updateList.removeAll(addList);
+
+ /*
+ * Drop domains that were removed, along with all
+ * of their servers and connections
+ */
+ for (String d : dropList) {
+ deleteDomain(d);
+ }
+
+ /*
+ * These are entirely new domains -- get the
+ * information for each new domain and configure
+ */
+ for (String d : addList) {
+ OpflexDomain od = new OpflexDomain();
+ od.setDomain(d);
+ opflexDomains.put(od.getDomain(), od );
+
+ /* Spawn the servers for this domain */
+ for (Domain dl : domainList) {
+ if (dl.getId().equals(d)) {
+ od.addServers(createServerList(od, dl));
+ break;
+ }
+ }
+ }
+
+ /*
+ * These are domains with updates
+ */
+ for (String d : updateList) {
+ OpflexDomain od = opflexDomains.get(d);
+ for (Domain domainObj : domainList) {
+ if (domainObj.getId().equals(d)) {
+ logger.warn("updateServers");
+ od.updateServers(createServerList(od, domainObj));
+ break;
+ }
+ }
+ }
+ }
+ }
- /*
- * We need to find out if there are any servers that
- * we have to drop. This is the set of servers that
- * are already running but don't appear in the configured
- * list. This just requires a check against the IP/port
- * (i.e. no need to check role).
- */
- Set <String> dropSet = opflexServers.keySet();
- dropSet.removeAll(addList);
- dropList.addAll(dropSet);
+ @Override
+ public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>,
+ DataObject>change) {
- /* remove deleted servers first */
- dropServers(dropList);
- addServers(serverList);
+ readConfig();
}
@Override
@Override
public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
+ if (!(message instanceof IdentityRequest)) {
+ logger.warn("message is not identity request {}", message);
+ return;
+ }
+ OpflexRpcServer srv = getOpflexServer(endpoint);
+ if (srv == null) return;
+
+ IdentityRequest request = (IdentityRequest)message;
IdentityResponse.Result result = new IdentityResponse.Result();
List<IdentityResponse.Peer> peers =
IdentityResponse response = new IdentityResponse();
/*
- * We find our role by matching the parent Channel (couldn't
- * come up with an easier way to do this, as we're trying to
- * match against the configured identity -- decided against
- * using the channel's connection b/c things like wildcard
- * addresses make this comparison tricky). There's also a
- * minute possibility that the parent socket has been deleted
- * (e.g. due to reconfiguration) in which case, the peers list
- * will provide the updated information.
+ * We inherit our role from the server that spawned
+ * the connection.
*/
- OpflexRpcServer srv = null;
List<String> myRoles = new ArrayList<String>();
- List<OpflexRpcServer> servers =
- new ArrayList<OpflexRpcServer>(opflexServers.values());
- for (OpflexRpcServer server : servers) {
- if (server.getRpcServer().getChannel() == endpoint.getChannel().parent()) {
- /* this is our server */
- List<Role> roles = server.getRoles();
- if (roles != null) {
- for ( Role r : roles ) {
- myRoles.add(r.toString());
- }
- }
- srv = server;
- break;
+ List<Role> roles = srv.getRoles();
+ if (roles != null) {
+ for ( Role r : roles ) {
+ myRoles.add(r.toString());
}
}
result.setMy_role(myRoles);
/*
* The peers field contains the identifiers other than my_role
*/
- for (OpflexRpcServer server : servers) {
- /* Skip our server -- reported in my_role */
- if ( Objects.equals(server.getId(), srv.getId()))
- continue;
- List<Role> roles = server.getRoles();
- if (roles != null) {
- for ( Role r : roles ) {
- IdentityResponse.Peer peer = new IdentityResponse.Peer();
- peer.setConnectivity_info(server.getId());
- peer.setRole(r.toString());
- peers.add(peer);
+ OpflexDomain od = getOpflexDomain(endpoint);
+ if (request.getParams() == null || request.getParams().size() <= 0) {
+ return;
+ }
+
+ if (!request.getDomain().equals(od.getDomain())) {
+ IdentityResponse.Error error = new IdentityResponse.Error();
+ error.setMessage(INVALID_DOMAIN);
+ response.setError(error);
+ /* send domain mismatch */
+ }
+ else {
+ for (OpflexRpcServer server : od.getOpflexServerList()) {
+ /* Skip our server -- reported in my_role */
+ if ( Objects.equals(server.getId(), srv.getId()))
+ continue;
+ roles = server.getRoles();
+ if (roles != null) {
+ for ( Role r : roles ) {
+ IdentityResponse.Peer peer = new IdentityResponse.Peer();
+ peer.setConnectivity_info(server.getId());
+ peer.setRole(r.toString());
+ peers.add(peer);
+ }
}
}
+ result.setPeers(peers);
+ result.setName(srv.getId());
+ result.setDomain(od.getDomain());
+ response.setResult(result);
}
- result.setPeers(peers);
- response.setResult(result);
-
response.setId(message.getId());
/*
@Override
public void addConnection(JsonRpcEndpoint endpoint) {
- List<Role> roles = new ArrayList<Role>();
- OpflexConnection agent = new OpflexConnection();
- agent.setEndpoint(endpoint);
- agent.setIdentity(endpoint.getIdentifier());
- if (endpoint.supportsMessages(policyRepositoryMessages)) {
- roles.add(Role.POLICY_REPOSITORY);
- }
- if (endpoint.supportsMessages(endpointRegistryMessages)) {
- roles.add(Role.ENDPOINT_REGISTRY);
- }
- if (endpoint.supportsMessages(observerMessages)) {
- roles.add(Role.OBSERVER);
+ /*
+ * When the connection is added, we don't have a context
+ * other than the JsonRpcEndpoint. We use the context
+ * field to store the server object that created this
+ * connection, and can look up things like the domain,
+ * etc. to create the containing connection object.
+ */
+ if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
+ logger.error("Connection for endpoint {} invalid",
+ endpoint.getIdentifier());
+ // TODO: close connection?
+ return;
}
- agent.setRoles(roles);
+
+ OpflexRpcServer server = (OpflexRpcServer)endpoint.getContext();
+ OpflexDomain domain = server.getDomain();
+
+
+ /*
+ * This is the notification when a new endpoint
+ * has been created. Since the endpoint is new,
+ * we don't have a OpflexConnection for it yet. We
+ * create the OpflexConnection, then get the
+ * OpflexRpcServer to set some of the fields
+ * we need (domain, server).
+ */
+ OpflexAgent oc = new OpflexAgent();
+ oc.setEndpoint(endpoint);
+ oc.setIdentity(endpoint.getIdentifier());
+ oc.setDomain(domain.getDomain());
+ oc.setOpflexServer(server);
+ oc.setRoles(server.getRoles());
+
+ /*
+ * The OpFlex domain is determined by the server socket
+ * that the agent connected to. Look up the OpFlex RPC
+ * server using the server socket.
+ *
+ * It's possible that the server was closed or changed
+ * between the connection establishment and now (race
+ * condition). Treat that as a failure, closing the
+ * connection.
+ */
logger.warn("Adding agent {}", endpoint.getIdentifier());
- opflexAgents.put(endpoint.getIdentifier(), agent);
+ domain.addOpflexAgent(oc);
}
@Override
- public void channelClosed(JsonRpcEndpoint peer) throws Exception {
- logger.info("Connection to Node : {} closed", peer.getIdentifier());
- opflexAgents.remove(peer.getIdentifier());
+ public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
+ logger.info("Connection to Node : {} closed", endpoint.getIdentifier());
+ OpflexAgent agent = getOpflexConnection(endpoint);
+ if (agent != null) {
+ OpflexDomain od = opflexDomains.get(agent.getDomain());
+ if (od != null) {
+ od.removeOpflexAgent(agent);
+ }
+ }
}
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ *
+ * An OpFlex domain is a logical grouping of OpFlex entities.
+ * The domain aggregates entites and provides methods so that they
+ * can be looked up or referenced by domain.
+ *
+ * The domain field is only present in the OpFlex Identity request message.
+ *
+ * @author tbachman
+ *
+ */
+public class OpflexDomain {
+ String domain;
+ ConcurrentMap<String, OpflexAgent> opflexAgents = null;
+ ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
+
+ OpflexDomain() {
+ opflexAgents = new ConcurrentHashMap<String, OpflexAgent>();
+ opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public ConcurrentMap<String, OpflexAgent> getOpflexAgents() {
+ return opflexAgents;
+ }
+
+ public void setOpflexAgents(
+ ConcurrentMap<String, OpflexAgent> opflexAgents) {
+ this.opflexAgents = opflexAgents;
+ }
+
+ public ConcurrentMap<String, OpflexRpcServer> getOpflexServers() {
+ return opflexServers;
+ }
+
+ public void setOpflexServers(
+ ConcurrentMap<String, OpflexRpcServer> opflexServers) {
+ this.opflexServers = opflexServers;
+ }
+
+ public void removeOpflexAgent(OpflexAgent agent) {
+ opflexAgents.remove(agent.getIdentity());
+ }
+
+ public void removeOpflexServer(OpflexRpcServer server) {
+ opflexServers.remove(server.getId());
+ }
+
+ public List<OpflexRpcServer> getOpflexServerList() {
+ return new ArrayList<OpflexRpcServer>(opflexServers.values());
+ }
+
+ /**
+ * Clean up all the entities contained by this domain. The
+ * connection service also owns these references, so we
+ * provide notifications to the connection service so that
+ * it can clean up as well.
+ */
+ public void cleanup() {
+ List<String> agents = new ArrayList<String>(opflexAgents.keySet());
+ List<String> servers = new ArrayList<String>(opflexServers.keySet());
+ for (String agent : agents) {
+ OpflexAgent conn = opflexAgents.remove(agent);
+ conn.getEndpoint().getChannel().disconnect();
+ }
+ for (String srv : servers) {
+ OpflexRpcServer server = opflexServers.get(srv);
+ if (server.getRpcServer().getChannel() != null) {
+ server.getRpcServer().getChannel().disconnect();
+ }
+ }
+ }
+
+ /**
+ * Add an {@link OpflexAgent} to the domain
+ *
+ * @param agent The agent to add
+ */
+ public void addOpflexAgent(OpflexAgent agent) {
+ opflexAgents.put(agent.getIdentity(), agent);
+ }
+
+ /**
+ * Return the {@link OpflexAgent} associated
+ * with this identity
+ *
+ * @param identity A string representing the connections identity
+ * @return The connection represented by that key, or null if not found
+ */
+ public OpflexAgent getOpflexAgent(String identity) {
+ return opflexAgents.get(identity);
+ }
+
+ /**
+ * Add the List of servers to the domain
+ *
+ * @param serverList List of new servers to start
+ */
+ public void addServers(List<OpflexRpcServer> serverList) {
+
+ if (serverList == null) return;
+
+ /*
+ * Check to see if there's already a server
+ * with this identity, and if so, close it
+ * and replace it with this one.
+ */
+ for ( OpflexRpcServer srv: serverList ) {
+ OpflexRpcServer server = opflexServers.get(srv.getId());
+ if (server != null) {
+ if ( !server.sameServer(srv)) {
+ OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
+ oldServer.getRpcServer().getChannel().disconnect();
+ opflexServers.put(srv.getId(), srv);
+ srv.start();
+ }
+ }
+ else {
+ opflexServers.put(srv.getId(), srv);
+ srv.start();
+ }
+ }
+ }
+
+ /**
+ * Drop the list of servers from the domain
+ *
+ * @param oldServers The list of servers to drop
+ *
+ * TODO: Should we provide notifications to or close
+ * the connections that were spawned by the
+ * deleted servers?
+ */
+ public void dropServers(List<String> oldServers) {
+ OpflexRpcServer server;
+
+ /*
+ * Check to see if there's already a server
+ * with this identity, and if so, close it
+ * and replace it with this one.
+ */
+ for (String srv: oldServers) {
+ if (opflexServers.containsKey(srv)) {
+ server = opflexServers.remove(srv);
+ server.getRpcServer().getChannel().disconnect();
+ }
+ }
+ }
+
+ /**
+ * Check the new configuration of the servers against the
+ * existing, and if different, delete the old server and
+ * replace it with a new server running the updated parameters.
+ *
+ * @param serverList The new server configurations
+ */
+ public void updateServers(List<OpflexRpcServer> serverList) {
+ /* Get the new list of configured servers in this domain */
+ List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
+ List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
+ List<String> newList = new ArrayList<String>();
+
+ for (OpflexRpcServer srv : serverList) {
+ newList.add(srv.getId());
+ }
+
+ /* Get the list of currently configured servers in this domain*/
+ List<String> currentList =
+ new ArrayList<String>(opflexServers.keySet());
+
+ /* Make the add/drop/update lists */
+ List<String> addList = new ArrayList<String>(newList);
+ List<String> dropList = new ArrayList<String>(currentList);
+ List<String> updateList = new ArrayList<String>(newList);
+
+ addList.removeAll(currentList);
+ dropList.removeAll(newList);
+ updateList.removeAll(addList);
+
+ /*
+ * Create a list of new servers by skipping any servers in the
+ * list that are already configured (i.e. same IP/socket and set
+ * of roles) -- no need to take them down
+ */
+ for (OpflexRpcServer srv: serverList) {
+ /*
+ * If this in our update list, check parameters
+ * to see if we really need to update it
+ */
+ if (updateList.contains(srv.getId())) {
+ OpflexRpcServer s = opflexServers.get(srv.getId());
+ if (s != null && s.getRoles().containsAll(srv.getRoles())) {
+ continue;
+ }
+ updateServers.add(srv);
+
+ }
+ if (addList.contains(srv.getId())) {
+ newServers.add(srv);
+ }
+ }
+
+
+ dropServers(dropList);
+ addServers(newServers);
+ addServers(updateServers);
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+
+/**
+ * The {@link OpflexRpcServer}s respond to OpFlex clients
+ * which create {@link OpflexAgent} objects when they
+ * are established. The servers don't own the connections,
+ * which allows the clients to continue operation even if
+ * the server is closed
+ *
+ * @author tbachman
+ *
+ */
+public class OpflexRpcServer {
+
+ private String identity;
+ private OpflexDomain domain;
+ private List<Role> roles;
+ private RpcServer rpcServer;
+ private ConnectionService connectionService;
+ private RpcBroker rpcBroker;
+
+ private String address;
+ private int port;
+
+ private void parseAndSetIdentity(String identity) {
+ if (identity.split(":").length == 2) {
+ this.identity = identity;
+ this.address = identity.split(":")[0];
+ this.port = Integer.parseInt(identity.split(":")[1]);
+ }
+ }
+
+ public OpflexRpcServer(OpflexDomain domain, String identity, List<Role> roles) {
+ this.domain = domain;
+ this.roles = roles;
+ parseAndSetIdentity(identity);
+ rpcServer = new RpcServer(address, port);
+ rpcServer.setContext(this);
+ }
+
+ public OpflexDomain getDomain() {
+ return domain;
+ }
+
+ public String getId() {
+ return this.identity;
+ }
+
+ public RpcServer getRpcServer() {
+ return rpcServer;
+ }
+
+ public ConnectionService getConnectionService() {
+ return connectionService;
+ }
+
+ public void setConnectionService(ConnectionService service) {
+ this.connectionService = service;
+ }
+
+ public RpcBroker getRpcBroker() {
+ return this.rpcBroker;
+ }
+
+ public void setRpcBroker(RpcBroker rpcBroker) {
+ this.rpcBroker = rpcBroker;
+ }
+
+ public List<Role> getRoles() {
+ return this.roles;
+ }
+
+ /**
+ * Start the {@link OpflexRpcServer}. This adds the supported
+ * messages to the server, based on the roles that were
+ * configured. It creates an {@link RpcServer} object,
+ * passes it the context owned by the {@link OpflexRpcServer},
+ * and starts the server in its own thread.
+ *
+ * TODO: should use executor service instead?
+ */
+ public void start() {
+ rpcServer.setConnectionService(connectionService);
+ rpcServer.setRpcBroker(rpcBroker);
+
+ for ( Role role : roles ) {
+ rpcServer.addMessageList(role.getMessages());
+ }
+
+ new Thread() {
+ private RpcServer server;
+
+ public Thread initializeServerParams(RpcServer server) {
+ this.server = server;
+ return this;
+ }
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ }
+ }
+ }.initializeServerParams(rpcServer).start();
+
+ }
+
+ /**
+ * Check to see if two servers are the same. They
+ * need to be in the same Opflex Domain, have the same
+ * identity, and the same roles, or they can be
+ * identical objects. Note that it purposely does
+ * not compare the RpcServer, as the purpose for
+ * this method is to see if there is already a server
+ * fulfilling this configuration (which is the reason
+ * it's a new method, instead of overriding toString).
+ *
+ * @param srv The server to compare against
+ * @return true if they are equivalent
+ */
+ public boolean sameServer(OpflexRpcServer srv) {
+ if (this == srv)
+ return true;
+ if (srv == null)
+ return false;
+ if (!this.identity.equals(srv.identity))
+ return false;
+ if (this.domain == null ||
+ !this.domain.getDomain().equals(srv.getDomain().getDomain()))
+ return false;
+ if (this.roles == null && srv.roles == null)
+ return true;
+ if (this.roles == null || srv.roles == null)
+ return false;
+ if (this.roles.size() == srv.roles.size()
+ && this.roles.containsAll(srv.roles))
+ return true;
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+
+/**
+ * Enum for OpFlex roles and their supported messages
+ *
+ * @author tbachman
+ *
+ */
+public enum Role {
+ POLICY_REPOSITORY("policy_repository"),
+ ENDPOINT_REGISTRY("endpoint_registry"),
+ OBSERVER("observer"),
+ POLICY_ELEMENT("policy_element");
+
+ static IdentityRequest idReq = new IdentityRequest();
+ static IdentityResponse idRsp = new IdentityResponse();
+
+ private final String role;
+
+ Role(String role) {
+ this.role = role;
+ }
+
+ /**
+ * Get the {@link RpcMessage}s supported by this Role
+ *
+ * @return List of RpcMessages supported for this Role
+ */
+ public List<RpcMessage> getMessages() {
+ if (role.equals(POLICY_REPOSITORY.toString())) {
+ List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+ msgList.add(idReq);
+ msgList.add(idRsp);
+ return msgList;
+ }
+ else if (role.equals(ENDPOINT_REGISTRY.toString())) {
+ List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+ msgList.add(idReq);
+ msgList.add(idRsp);
+ return msgList;
+ }
+ else if (role.equals(OBSERVER.toString())) {
+ List<RpcMessage> msgList = new ArrayList<RpcMessage>();
+ msgList.add(idReq);
+ msgList.add(idRsp);
+ return msgList;
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return this.role;
+ }
+}
\ No newline at end of file
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.ArrayList;
import java.util.List;
public IdentityRequest() {
this.name = IDENTITY_MESSAGE;
}
+
+ public String getDomain() {
+ if (this.params != null && this.params.get(0) != null) {
+ return this.params.get(0).getDomain();
+ }
+ return null;
+ }
}
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.ArrayList;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import java.util.List;
*
* Authors : Thomas Bachman
*/
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
+package org.opendaylight.groupbasedpolicy.renderer.opflex.messages;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
/**
* The policy resolver is a utility for renderers to help in resolving
* group-based policy into a form that is easier to apply to the actual network.
- *
- * <p>For any pair of endpoint groups, there is a set of rules that could apply
+ *
+ * <p>For any pair of endpoint groups, there is a set of rules that could apply
* to the endpoints on that group based on the policy configuration. The exact
- * list of rules that apply to a given pair of endpoints depends on the
+ * list of rules that apply to a given pair of endpoints depends on the
* conditions that are active on the endpoints.
- *
- * In a more formal sense: Let there be endpoint groups G_n, and for each G_n a
- * set of conditions C_n that can apply to endpoints in G_n. Further, let S be
- * the set of lists of rules defined in the policy. Our policy can be
- * represented as a function F: (G_n, 2^C_n, G_m, 2^C_m) -> S, where 2^C_n
- * represents the power set of C_n. In other words, we want to map all the
- * possible tuples of pairs of endpoints along with their active conditions
+ *
+ * In a more formal sense: Let there be endpoint groups G_n, and for each G_n a
+ * set of conditions C_n that can apply to endpoints in G_n. Further, let S be
+ * the set of lists of rules defined in the policy. Our policy can be
+ * represented as a function F: (G_n, 2^C_n, G_m, 2^C_m) -> S, where 2^C_n
+ * represents the power set of C_n. In other words, we want to map all the
+ * possible tuples of pairs of endpoints along with their active conditions
* onto the right list of rules to apply.
- *
- * <p>We need to be able to query against this policy model, enumerate the
+ *
+ * <p>We need to be able to query against this policy model, enumerate the
* relevant classes of traffic and endpoints, and notify renderers when there
- * are changes to policy as it applies to active sets of endpoints and
+ * are changes to policy as it applies to active sets of endpoints and
* endpoint groups.
- *
+ *
* <p>The policy resolver will maintain the necessary state for all tenants
- * in its control domain, which is the set of tenants for which
+ * in its control domain, which is the set of tenants for which
* policy listeners have been registered.
- *
+ *
* @author readams
*/
public class PolicyResolver implements AutoCloseable {
private final DataBroker dataProvider;
private final ScheduledExecutorService executor;
-
+
/**
* Keep track of the current relevant policy scopes.
*/
private CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
-
+
protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
-
+
private PolicyCache policyCache = new PolicyCache();
-
+
public PolicyResolver(DataBroker dataProvider,
ScheduledExecutorService executor) {
super();
// *************************
/**
- * Get the policy that currently applies to a pair of endpoints.
+ * Get the policy that currently applies to a pair of endpoints.
* with the specified groups and conditions. The first endpoint acts as
- * the consumer and the second endpoint acts as the provider, so to get
+ * the consumer and the second endpoint acts as the provider, so to get
* all policy related to this pair of endpoints you must call this
* function twice: once for each possible order of endpoints.
- *
- * @param ep1Tenant the tenant ID for the first endpoint
- * @param ep1Group the endpoint group for the first endpoint
+ *
+ * @param ep1Tenant the tenant ID for the first endpoint
+ * @param ep1Group the endpoint group for the first endpoint
* @param ep1Conds The conditions that apply to the first endpoint
* @param ep2Tenant the tenant ID for the second endpoint
- * @param ep2Group the endpoint group for the second endpoint
+ * @param ep2Group the endpoint group for the second endpoint
* @param ep2Conds The conditions that apply to the second endpoint.
* @return a list of {@link RuleGroup} that apply to the endpoints.
* Cannot be null, but may be an empty list of rulegroups
*/
public List<RuleGroup> getPolicy(TenantId ep1Tenant,
- EndpointGroupId ep1Group,
+ EndpointGroupId ep1Group,
ConditionSet ep1Conds,
TenantId ep2Tenant,
- EndpointGroupId ep2Group,
+ EndpointGroupId ep2Group,
ConditionSet ep2Conds) {
- return policyCache.getPolicy(ep1Tenant, ep1Group, ep1Conds,
+ return policyCache.getPolicy(ep1Tenant, ep1Group, ep1Conds,
ep2Tenant, ep2Group, ep2Conds);
}
if (tc == null) return null;
return tc.tenant.get();
}
-
+
/**
* Register a listener to receive update events.
* @param listener the {@link PolicyListener} object to receive the update
public PolicyScope registerListener(PolicyListener listener) {
PolicyScope ps = new PolicyScope(this, listener);
policyListenerScopes.add(ps);
-
+
return ps;
}
-
+
/**
* Remove the listener registered for the given {@link PolicyScope}.
* @param scope the scope to remove
* @see PolicyResolver#registerListener(PolicyListener)
*/
public void removeListener(PolicyScope scope) {
- policyListenerScopes.remove(scope);
+ policyListenerScopes.remove(scope);
}
// **************
// Implementation
// **************
-
+
/**
* Notify the policy listeners about a set of updated consumers
*/
private void notifyListeners(Set<EgKey> updatedConsumers) {
for (final PolicyScope scope : policyListenerScopes) {
- Set<EgKey> filtered =
+ Set<EgKey> filtered =
Sets.filter(updatedConsumers, new Predicate<EgKey>() {
@Override
public boolean apply(EgKey input) {
- return scope.contains(input.getTenantId(),
+ return scope.contains(input.getTenantId(),
input.getEgId());
}
});
}
}
}
-
+
/**
* Subscribe the resolver to updates related to a particular tenant
* Make sure that this can't be called concurrently with subscribe
if (dataProvider != null) {
registration = dataProvider
.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
- TenantUtils.tenantIid(tenantId),
+ TenantUtils.tenantIid(tenantId),
new PolicyChangeListener(tenantId),
DataChangeScope.SUBTREE);
}
context = new TenantContext(registration);
- TenantContext oldContext =
+ TenantContext oldContext =
resolvedTenants.putIfAbsent(tenantId, context);
if (oldContext != null) {
// already registered in a different thread; just use the other
// Resolve the new tenant and update atomically
final AtomicReference<IndexedTenant> tenantRef = context.tenant;
final IndexedTenant ot = tenantRef.get();
- ReadOnlyTransaction transaction =
+ ReadOnlyTransaction transaction =
dataProvider.newReadOnlyTransaction();
InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
- ListenableFuture<Optional<DataObject>> unresolved;
+ ListenableFuture<Optional<Tenant>> unresolved;
unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
-
- Futures.addCallback(unresolved, new FutureCallback<Optional<DataObject>>() {
+
+ Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
@Override
- public void onSuccess(Optional<DataObject> result) {
+ public void onSuccess(Optional<Tenant> result) {
if (!result.isPresent()) return;
Tenant t = InheritanceUtils.resolveTenant((Tenant)result.get());
updateTenant(tenantId);
} else {
// Update the policy cache and notify listeners
- Table<EgKey, EgKey, Policy> policy = resolvePolicy(t);
- Set<EgKey> updatedConsumers =
+ Table<EgKey, EgKey, Policy> policy = resolvePolicy(t);
+ Set<EgKey> updatedConsumers =
policyCache.updatePolicy(policy, policyListenerScopes);
notifyListeners(updatedConsumers);
}
}, executor);
}
-
-
+
+
/**
* Resolve the policy in three phases:
- * (1) select contracts that in scope based on contract selectors.
+ * (1) select contracts that in scope based on contract selectors.
* (2) select subjects that are in scope for each contract based on
* matchers in clauses
* (3) resolve the set of in-scope contracts into a list of subjects that
- * apply for each pair of endpoint groups and the conditions that can
+ * apply for each pair of endpoint groups and the conditions that can
* apply for for each endpoint in those groups.
*/
protected Table<EgKey, EgKey, Policy> resolvePolicy(Tenant t) {
// select contracts that apply for the given tenant
Table<EgKey, EgKey, List<ContractMatch>> contractMatches =
selectContracts(t);
-
+
// select subjects for the matching contracts and resolve the policy
// for endpoint group pairs. This does phase (2) and (3) as one step
return selectSubjects(contractMatches);
}
-
+
/**
* Choose the contracts that are in scope for each pair of endpoint
* groups, then perform subject selection for the pair
*/
- protected Table<EgKey, EgKey, List<ContractMatch>>
+ protected Table<EgKey, EgKey, List<ContractMatch>>
selectContracts(Tenant tenant) {
- // For each endpoint group, match consumer selectors
+ // For each endpoint group, match consumer selectors
// against contracts to get a set of matching consumer selectors
- Table<TenantId, ContractId, List<ConsumerContractMatch>> consumerMatches =
+ Table<TenantId, ContractId, List<ConsumerContractMatch>> consumerMatches =
HashBasedTable.create();
if (tenant.getEndpointGroup() == null) return HashBasedTable.create();
for (EndpointGroup group : tenant.getEndpointGroup()) {
- List<ConsumerContractMatch> r =
+ List<ConsumerContractMatch> r =
matchConsumerContracts(tenant, group);
for (ConsumerContractMatch ccm : r) {
- List<ConsumerContractMatch> cms =
- consumerMatches.get(tenant.getId(),
+ List<ConsumerContractMatch> cms =
+ consumerMatches.get(tenant.getId(),
ccm.contract.getId());
if (cms == null) {
cms = new ArrayList<>();
- consumerMatches.put(tenant.getId(),
+ consumerMatches.put(tenant.getId(),
ccm.contract.getId(), cms);
}
cms.add(ccm);
}
}
-
+
// Match provider selectors, and check each match for a corresponding
// consumer selector match.
- Table<EgKey, EgKey, List<ContractMatch>> contractMatches =
+ Table<EgKey, EgKey, List<ContractMatch>> contractMatches =
HashBasedTable.create();
for (EndpointGroup group : tenant.getEndpointGroup()) {
- List<ContractMatch> matches =
+ List<ContractMatch> matches =
matchProviderContracts(tenant, group, consumerMatches);
for (ContractMatch cm : matches) {
- EgKey consumerKey = new EgKey(cm.consumerTenant.getId(),
+ EgKey consumerKey = new EgKey(cm.consumerTenant.getId(),
cm.consumer.getId());
- EgKey providerKey = new EgKey(cm.providerTenant.getId(),
+ EgKey providerKey = new EgKey(cm.providerTenant.getId(),
cm.provider.getId());
List<ContractMatch> egPairMatches =
contractMatches.get(consumerKey, providerKey);
}
return contractMatches;
}
-
+
private boolean clauseMatches(Clause clause, ContractMatch match) {
if (clause.getConsumerMatchers() != null) {
- List<RequirementMatcher> reqMatchers =
+ List<RequirementMatcher> reqMatchers =
clause.getConsumerMatchers().getRequirementMatcher();
if (reqMatchers != null) {
for (RequirementMatcher reqMatcher : reqMatchers) {
- if (!MatcherUtils.applyReqMatcher(reqMatcher,
+ if (!MatcherUtils.applyReqMatcher(reqMatcher,
match.consumerRelator)) {
return false;
}
}
}
if (clause.getProviderMatchers() != null) {
- List<CapabilityMatcher> capMatchers =
+ List<CapabilityMatcher> capMatchers =
clause.getProviderMatchers().getCapabilityMatcher();
if (capMatchers != null) {
for (CapabilityMatcher capMatcher : capMatchers) {
- if (!MatcherUtils.applyCapMatcher(capMatcher,
+ if (!MatcherUtils.applyCapMatcher(capMatcher,
match.providerRelator)) {
return false;
}
ImmutableSet.Builder<ConditionName> allb = ImmutableSet.builder();
ImmutableSet.Builder<ConditionName> noneb = ImmutableSet.builder();
- ImmutableSet.Builder<Set<ConditionName>> anyb =
+ ImmutableSet.Builder<Set<ConditionName>> anyb =
ImmutableSet.builder();
for (ConditionMatcher condMatcher : condMatchers) {
if (condMatcher.getCondition() == null)
MatchType type = condMatcher.getMatchType();
if (type == null) type = MatchType.All;
if (type.equals(MatchType.Any)) {
- ImmutableSet.Builder<ConditionName> a =
+ ImmutableSet.Builder<ConditionName> a =
ImmutableSet.builder();
for (Condition c : condMatcher.getCondition()) {
a.add(c.getName());
}
anyb.add(a.build());
- } else {
+ } else {
for (Condition c : condMatcher.getCondition()) {
switch (type) {
case Any:
}
return new ConditionSet(allb.build(), noneb.build(), anyb.build());
}
-
+
private ConditionSet buildConsConditionSet(Clause clause) {
if (clause.getConsumerMatchers() != null) {
List<ConditionMatcher> condMatchers =
}
return ConditionSet.EMPTY;
}
-
+
private Policy resolvePolicy(Tenant contractTenant,
Contract contract,
Policy merge,
Table<ConditionSet, ConditionSet, List<Subject>> subjectMap) {
- Table<ConditionSet, ConditionSet, List<RuleGroup>> ruleMap =
+ Table<ConditionSet, ConditionSet, List<RuleGroup>> ruleMap =
HashBasedTable.create();
if (merge != null) {
ruleMap.putAll(merge.ruleMap);
}
- for (Cell<ConditionSet, ConditionSet, List<Subject>> entry :
+ for (Cell<ConditionSet, ConditionSet, List<Subject>> entry :
subjectMap.cellSet()) {
List<RuleGroup> rules = new ArrayList<>();
- List<RuleGroup> oldrules =
+ List<RuleGroup> oldrules =
ruleMap.get(entry.getRowKey(), entry.getColumnKey());
if (oldrules != null) {
rules.addAll(oldrules);
rules.add(rg);
}
Collections.sort(rules);
- ruleMap.put(entry.getRowKey(), entry.getColumnKey(),
+ ruleMap.put(entry.getRowKey(), entry.getColumnKey(),
Collections.unmodifiableList(rules));
}
return new Policy(ruleMap);
}
-
+
/**
- * Choose the set of subjects that in scope for each possible set of
+ * Choose the set of subjects that in scope for each possible set of
* endpoint conditions
*/
- protected Table<EgKey, EgKey, Policy>
- selectSubjects(Table<EgKey, EgKey,
+ protected Table<EgKey, EgKey, Policy>
+ selectSubjects(Table<EgKey, EgKey,
List<ContractMatch>> contractMatches) {
// Note that it's possible to further simplify the resulting policy
// in the case of things like repeated rules, condition sets that
List<Subject> subjectList = match.contract.getSubject();
if (subjectList == null) continue;
-
+
EgKey ckey = new EgKey(match.consumerTenant.getId(),
match.consumer.getId());
EgKey pkey = new EgKey(match.providerTenant.getId(),
}
if (alreadyMatched) continue;
}
-
+
HashMap<SubjectName, Subject> subjects = new HashMap<>();
for (Subject s : subjectList) {
subjects.put(s.getName(), s);
}
-
- Table<ConditionSet, ConditionSet, List<Subject>> subjectMap =
+
+ Table<ConditionSet, ConditionSet, List<Subject>> subjectMap =
HashBasedTable.create();
-
+
for (Clause clause : clauses) {
if (clause.getSubjectRefs() != null &&
clauseMatches(clause, match)) {
ConditionSet consCSet = buildConsConditionSet(clause);
ConditionSet provCSet = buildProvConditionSet(clause);
- List<Subject> clauseSubjects =
+ List<Subject> clauseSubjects =
subjectMap.get(consCSet, provCSet);
if (clauseSubjects == null) {
clauseSubjects = new ArrayList<>();
}
}
- policy.put(ckey, pkey,
- resolvePolicy(match.contractTenant,
+ policy.put(ckey, pkey,
+ resolvePolicy(match.contractTenant,
match.contract,
- existing,
+ existing,
subjectMap));
}
}
-
+
return policy;
}
-
+
private List<ConsumerContractMatch> matchConsumerContracts(Tenant tenant,
EndpointGroup consumer) {
List<ConsumerContractMatch> matches = new ArrayList<>();
for (ConsumerNamedSelector cns : consumer.getConsumerNamedSelector()) {
if (cns.getContract() == null) continue;
for (ContractId contractId : cns.getContract()) {
- Contract contract =
+ Contract contract =
TenantUtils.findContract(tenant, contractId);
if (contract == null) continue;
- matches.add(new ConsumerContractMatch(tenant, contract,
- tenant, consumer,
+ matches.add(new ConsumerContractMatch(tenant, contract,
+ tenant, consumer,
cns));
}
}
}
}
if (match) {
- matches.add(new ConsumerContractMatch(tenant,
- contract,
- tenant,
- consumer,
+ matches.add(new ConsumerContractMatch(tenant,
+ contract,
+ tenant,
+ consumer,
cts));
}
}
// for (ConsumerTargetSelector cts : consumer.getConsumerTargetSelector()) {
// if (tenant.getContractRef() == null) continue;
// for (ContractRef c : tenant.getContractRef()) {
-//
+//
// }
// }
return matches;
private void amendContractMatches(List<ContractMatch> matches,
List<ConsumerContractMatch> cMatches,
- Tenant tenant, EndpointGroup provider,
+ Tenant tenant, EndpointGroup provider,
ProviderSelectionRelator relator) {
if (cMatches == null) return;
for (ConsumerContractMatch cMatch : cMatches) {
matches.add(new ContractMatch(cMatch, tenant, provider, relator));
}
}
-
- private List<ContractMatch>
+
+ private List<ContractMatch>
matchProviderContracts(Tenant tenant, EndpointGroup provider,
- Table<TenantId,
- ContractId,
+ Table<TenantId,
+ ContractId,
List<ConsumerContractMatch>> consumerMatches) {
List<ContractMatch> matches = new ArrayList<>();
if (provider.getProviderNamedSelector() != null) {
for (ContractId contractId : pns.getContract()) {
Contract c = TenantUtils.findContract(tenant, contractId);
if (c == null) continue;
- List<ConsumerContractMatch> cMatches =
+ List<ConsumerContractMatch> cMatches =
consumerMatches.get(tenant.getId(), c.getId());
amendContractMatches(matches, cMatches, tenant, provider, pns);
}
}
}
if (match) {
- List<ConsumerContractMatch> cMatches =
- consumerMatches.get(tenant.getId(),
+ List<ConsumerContractMatch> cMatches =
+ consumerMatches.get(tenant.getId(),
c.getId());
- amendContractMatches(matches, cMatches, tenant,
+ amendContractMatches(matches, cMatches, tenant,
provider, pts);
}
ListenerRegistration<DataChangeListener> registration;
AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
-
+
public TenantContext(ListenerRegistration<DataChangeListener> registration) {
super();
this.registration = registration;
}
}
-
+
/**
* Represents a selected contract made by endpoint groups matching it
* using selection relators. This is the result of the contract selection
* The tenant ID of the provider endpoint group
*/
final Tenant providerTenant;
-
+
/**
* The provider endpoint group
*/
final EndpointGroup provider;
-
+
/**
* The provider selection relator that was used to match the contract
*/
public ContractMatch(ConsumerContractMatch consumerMatch,
Tenant providerTenant, EndpointGroup provider,
ProviderSelectionRelator providerRelator) {
- super(consumerMatch.contractTenant,
- consumerMatch.contract,
+ super(consumerMatch.contractTenant,
+ consumerMatch.contract,
consumerMatch.consumerTenant,
- consumerMatch.consumer,
+ consumerMatch.consumer,
consumerMatch.consumerRelator);
this.providerTenant = providerTenant;
this.provider = provider;
* The tenant of the matching contract
*/
final Tenant contractTenant;
-
+
/**
* The matching contract
*/
* The tenant for the endpoint group
*/
final Tenant consumerTenant;
-
+
/**
* The consumer endpoint group
*/
final EndpointGroup consumer;
-
+
/**
* The consumer selection relator that was used to match the contract
*/
final ConsumerSelectionRelator consumerRelator;
-
+
public ConsumerContractMatch(Tenant contractTenant,
Contract contract,
@Immutable
private class PolicyChangeListener implements DataChangeListener {
final TenantId tenantId;
-
+
public PolicyChangeListener(TenantId tenantId) {
super();
this.tenantId = tenantId;
@Override
public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
- updateTenant(tenantId);
+ updateTenant(tenantId);
}
-
+
}
}
}
}
- // ******************
- // Configuration Data
- // ******************
- container discovery-definitions {
+ container domains {
description
- "The nodes that any OpFlex agent needs to communicate
- with in an OpFlex policy fabric.";
+ "The list of all OpFlex domains.";
- list policy-repository {
- description
- "A repository that the OpFlex agent can use for resolving
- policies.";
-
- key "id";
+ config true;
- leaf id {
- description
- "The id for the policy repository.";
- type string;
- config true;
- }
- leaf port {
- description
- "The port number to use for the connection";
- type int32;
- config true;
- }
- leaf serialization-type {
- description
- "The serialization to use for this connection.";
- type serialization;
- config true;
- }
- }
-
- list endpoint-registry {
+ list domain {
description
- "A repository that the OpFlex agent can use for registration
- and lookup of endpoints.";
+ "An administrative domain for OpFlex entities.";
key "id";
leaf id {
description
- "The id for the endpoint registry.";
+ "The id for the domain.";
type string;
config true;
}
- leaf port {
- description
- "The port number to use for the connection";
- type int32;
- config true;
- }
- leaf serialization-type {
- description
- "The serialization to use for this connection.";
- type serialization;
- config true;
- }
- }
-
- list observer {
- description
- "A repository that the OpFlex agent can send State Report
- messages to.";
-
- key "id";
- leaf id {
- description
- "The id for the Observer.";
- type string;
- config true;
- }
- leaf port {
- description
- "The port number to use for the connection";
- type int32;
- config true;
- }
- leaf serialization-type {
- description
- "The serialization to use for this connection.";
- type serialization;
- config true;
+ // ******************
+ // Configuration Data
+ // ******************
+ container discovery-definitions {
+ description
+ "The nodes that any OpFlex agent needs to communicate
+ with in an OpFlex policy fabric.";
+
+ list policy-repository {
+ description
+ "A repository that the OpFlex agent can use for resolving
+ policies.";
+
+ key "id";
+
+ leaf id {
+ description
+ "The id for the policy repository.";
+ type string;
+ config true;
+ }
+ leaf port {
+ description
+ "The port number to use for the connection";
+ type int32;
+ config true;
+ }
+ leaf serialization-type {
+ description
+ "The serialization to use for this connection.";
+ type serialization;
+ config true;
+ }
+ }
+
+ list endpoint-registry {
+ description
+ "A repository that the OpFlex agent can use for registration
+ and lookup of endpoints.";
+
+ key "id";
+
+ leaf id {
+ description
+ "The id for the endpoint registry.";
+ type string;
+ config true;
+ }
+ leaf port {
+ description
+ "The port number to use for the connection";
+ type int32;
+ config true;
+ }
+ leaf serialization-type {
+ description
+ "The serialization to use for this connection.";
+ type serialization;
+ config true;
+ }
+ }
+
+ list observer {
+ description
+ "A repository that the OpFlex agent can send State Report
+ messages to.";
+
+ key "id";
+
+ leaf id {
+ description
+ "The id for the Observer.";
+ type string;
+ config true;
+ }
+ leaf port {
+ description
+ "The port number to use for the connection";
+ type int32;
+ config true;
+ }
+ leaf serialization-type {
+ description
+ "The serialization to use for this connection.";
+ type serialization;
+ config true;
+ }
+ }
}
}
}
import static io.netty.buffer.Unpooled.copiedBuffer;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.embedded.EmbeddedChannel;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.Domains;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.Domain;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.DiscoveryDefinitionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.EndpointRegistryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.ObserverBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepository;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.domains.domain.discovery.definitions.PolicyRepositoryBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OpflexConnectionServiceTest {
protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
- static private final String TEST_RPC_MESSAGE_NAME = "test_message";
-
static private final String TEST_EP_UUID = "85d53c32-47af-4eaf-82fd-ced653ff74da";
- static private final String TEST_ID_UUID = "788950f6-2279-4ae1-820e-d277cea3623c";
static public final String TEST_IP = "127.0.0.1";
static public final String TEST_PORT = "57563";
static private final String ID_UUID = "2da9e3d7-0bbe-4099-b343-12783777452f";
static private final String SEND_IDENTITY = "send_identity";
- static private final String POLICY_REQUEST = "resolve_policy";
- static private final String DOMAIN_UUID = "75caaff2-cb4f-4509-b45e-47b447cb35a9";
+ static private final String DOMAIN_UUID = "default";
static private final String NAME = "vm1";
static private final String IDENTITY = "192.168.0.1:56732";
static private final String opflexIdentityRequest =
" \"params\": [ {" +
" \"name\": \"" + NAME + "\"," +
" \"domain\": \"" + DOMAIN_UUID + "\"," +
- " \"my_role\": [\"" + OpflexConnectionService.Role.POLICY_ELEMENT.toString() + "\"]" +
+ " \"my_role\": [\"" + Role.POLICY_ELEMENT.toString() + "\"]" +
" }] }";
@Mock
@Mock
private ReadOnlyTransaction mockRead;
@Mock
- private ListenableFuture<Optional<DataObject>> mockOption;
+ private WriteTransaction mockWrite;
+ @Mock
+ private ListenableFuture<Optional<Domains>> mockOption;
+ @Mock
+ ListenableFuture<RpcResult<TransactionStatus>> mockStatus;
+ @Mock
+ private Optional<Domains> mockDao;
+ @Mock
+ private Domains mockDomains;
+ @Mock
+ private Domain mockDomain;
@Mock
- private Optional<DataObject> mockDao;
+ private OpflexDomain mockOpflexDomain;
+ @Mock
+ private OpflexRpcServer mockOpflexServer;
+ @Mock
+ private OpflexAgent mockAgent;
@Before
public void setUp() throws Exception {
* Mocks
*/
when(mockDataBroker.newReadOnlyTransaction()).thenReturn(mockRead);
- when(mockRead.read(LogicalDatastoreType.CONFIGURATION, OpflexConnectionService.
- DISCOVERY_DEFINITIONS_IID)).thenReturn(mockOption);
+ when(mockDataBroker.newWriteOnlyTransaction()).thenReturn(mockWrite);
+ when(mockWrite.commit()).thenReturn(mockStatus);
+ when(mockRead.read(LogicalDatastoreType.CONFIGURATION,
+ OpflexConnectionService.DOMAINS_IID)).thenReturn(mockOption);
when(mockOption.get()).thenReturn(mockDao);
- when(mockDao.get()).thenReturn(dummyDefinitions);
+ when(mockDao.get()).thenReturn(mockDomains);
+ when(mockDomains.getDomain())
+ .thenReturn(new ArrayList<Domain>() {{ add(mockDomain); }});
+ when(mockDomain.getDiscoveryDefinitions()).thenReturn(dummyDefinitions);
/*
* Builders for creating our own discovery definitions
System.setProperty(OpflexConnectionService.OPFLEX_LISTENIP, TEST_IP);
}
-
//@Test
public void testNoDefinitions() throws Exception {
//@Test
public void testAddConnection() throws Exception {
- opflexService = new OpflexConnectionService();
- opflexService.setDataProvider(mockDataBroker);
-
- when(mockEp.supportsMessages(opflexService.
- policyRepositoryMessages)).thenReturn(true);
when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
-
+ when(mockEp.getContext()).thenReturn(mockOpflexServer);
+ when(mockOpflexServer.getDomain()).thenReturn(mockOpflexDomain);
+ when(mockOpflexDomain.getDomain()).thenReturn(DOMAIN_UUID);
opflexService = new OpflexConnectionService();
opflexService.setDataProvider(mockDataBroker);
opflexService.addConnection(mockEp);
- verify(mockEp, Mockito.times(3)).supportsMessages(opflexService.policyRepositoryMessages);
- verify(mockEp, Mockito.times(3)).getIdentifier();
- assertTrue(opflexService.opflexAgents.size() == 1);
+ verify(mockEp, Mockito.times(2)).getIdentifier();
+ verify(mockOpflexDomain, Mockito.times(1)).addOpflexAgent((OpflexAgent)anyObject());
}
//@Test
public void testChannelClosed() throws Exception {
- opflexService = new OpflexConnectionService();
- opflexService.setDataProvider(mockDataBroker);
-
- JsonRpcEndpoint mockEp = mock(JsonRpcEndpoint.class);
-
- when(mockEp.supportsMessages(opflexService.
- policyRepositoryMessages)).thenReturn(true);
when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
+ when(mockEp.getContext()).thenReturn(mockOpflexServer);
+ when(mockOpflexDomain.getDomain()).thenReturn(DOMAIN_UUID);
+ when(mockAgent.getDomain()).thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
opflexService = new OpflexConnectionService();
opflexService.setDataProvider(mockDataBroker);
+ when(mockOpflexServer.getDomain()).
+ thenReturn(opflexService.opflexDomains.get(OpflexConnectionService.OPFLEX_DOMAIN));
opflexService.addConnection(mockEp);
- assertTrue(opflexService.opflexAgents.size() == 1);
+
+ verify(mockEp, Mockito.times(2)).getIdentifier();
+
+ when(mockOpflexServer.getDomain()).thenReturn(mockOpflexDomain);
+ when(mockOpflexDomain.getOpflexAgent(TEST_EP_UUID)).thenReturn(mockAgent);
+ when(mockAgent.getDomain()).thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
+ when(mockAgent.getIdentity()).thenReturn(TEST_EP_UUID);
opflexService.channelClosed(mockEp);
- assertTrue(opflexService.opflexAgents.size() == 0);
+ verify(mockAgent).getIdentity();
}
//@Test
public void testPublishSubscribeCallback() throws Exception {
+ List<Role> testRoles = new ArrayList<Role>();
+ testRoles.add(Role.POLICY_REPOSITORY);
+ testRoles.add(Role.ENDPOINT_REGISTRY);
+ testRoles.add(Role.OBSERVER);
+
/*
* This is *far* from UT, but worthwhile for now
*/
EmbeddedChannel channel = new EmbeddedChannel(decoder, binderHandler);
RpcMessageMap messageMap = new RpcMessageMap();
- IdentityRequest rpcMsg = new IdentityRequest();
- messageMap.add(rpcMsg);
+ messageMap.addList(Role.POLICY_REPOSITORY.getMessages());
+
JsonRpcEndpoint ep = new JsonRpcEndpoint(IDENTITY , opflexService,
objectMapper, channel, messageMap, opflexService);
+ ep.setContext(mockOpflexServer);
binderHandler.setEndpoint(ep);
+
+ when(mockOpflexServer.getRoles()).thenReturn(testRoles);
+ when(mockOpflexServer.getDomain()).
+ thenReturn(opflexService.opflexDomains.get(OpflexConnectionService.OPFLEX_DOMAIN));
opflexService.addConnection(ep);
channel.writeInbound(copiedBuffer(opflexIdentityRequest, CharsetUtil.UTF_8));
Object result = channel.readOutbound();
IdentityResponse resp = objectMapper.readValue(result.toString(), IdentityResponse.class);
assertTrue(result != null);
assertTrue(resp.getResult().getMy_role()
- .contains(OpflexConnectionService.Role.ENDPOINT_REGISTRY.toString()));
+ .contains(Role.ENDPOINT_REGISTRY.toString()));
assertTrue(resp.getResult().getMy_role()
- .contains(OpflexConnectionService.Role.POLICY_REPOSITORY.toString()));
+ .contains(Role.POLICY_REPOSITORY.toString()));
assertTrue(resp.getResult().getMy_role()
- .contains(OpflexConnectionService.Role.OBSERVER.toString()));
+ .contains(Role.OBSERVER.toString()));
}
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.Channel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class OpflexDomainTest {
+ protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+
+ private static final String TEST_DOMAIN = "default";
+ private static final String TEST_ID = "localhost:6671";
+ private OpflexDomain testDomain;
+ private List<Role> dummyRoles = null;
+
+ @Mock
+ private OpflexRpcServer mockServer;
+ @Mock
+ private Channel mockChannel;
+ @Mock
+ private RpcServer mockRpcServer;
+
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ dummyRoles = new ArrayList<Role>();
+ dummyRoles.add(Role.POLICY_REPOSITORY);
+
+ testDomain = new OpflexDomain();
+ testDomain.setDomain(TEST_DOMAIN);
+ when(mockServer.getRpcServer()).thenReturn(mockRpcServer);
+ when(mockRpcServer.getChannel()).thenReturn(mockChannel);
+ when(mockServer.getId()).thenReturn(TEST_ID);
+ when(mockServer.sameServer((OpflexRpcServer)anyObject())).thenReturn(false);
+ when(mockServer.getRoles()).thenReturn(dummyRoles);
+ }
+
+ @Test
+ public void testAddServers() throws Exception {
+ List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+ servers.add(mockServer);
+ testDomain.addServers(servers);
+ verify(mockServer).start();
+ }
+
+ @Test
+ public void testDropServers() throws Exception {
+ List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+ servers.add(mockServer);
+ testDomain.addServers(servers);
+
+ List<String> dropList = new ArrayList<String>();
+ dropList.add(TEST_ID);
+ testDomain.dropServers(dropList);
+ verify(mockServer).getRpcServer();
+ verify(mockRpcServer).getChannel();
+ verify(mockChannel).disconnect();
+ }
+
+ @Test
+ public void testAddDuplicateServer() throws Exception {
+ List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+ servers.add(mockServer);
+ testDomain.addServers(servers);
+ testDomain.addServers(servers);
+ verify(mockServer).getRpcServer();
+ verify(mockRpcServer).getChannel();
+ verify(mockChannel).disconnect();
+ }
+
+ @Test
+ public void testUpdateServers() throws Exception {
+ List<OpflexRpcServer> servers = new ArrayList<OpflexRpcServer>();
+ servers.add(mockServer);
+ testDomain.addServers(servers);
+ testDomain.addServers(servers);
+ verify(mockServer).getRpcServer();
+ verify(mockRpcServer).getChannel();
+ verify(mockChannel).disconnect();
+
+ }
+}
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointDeclarationResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointPolicyUpdateRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointPolicyUpdateResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.EndpointRequestResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyResolutionRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyResolutionResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyTriggerRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyTriggerResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyUpdateRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.PolicyUpdateResponse;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.StateReportRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.StateReportResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ *
+ */
+public class OpflexRpcServerTest implements ConnectionService {
+ protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+ private static final String TEST_IDENTITY = "localhost:6671";
+ private static final String TEST_IDENTITY2 = "localhost:6672";
+ private static final String TEST_DOMAIN = "default";
+
+ private OpflexRpcServer testServer = null;
+ private OpflexRpcServer ts1 = null;
+ private OpflexRpcServer ts2 = null;
+ private OpflexRpcServer ts3 = null;
+ private List<Role> roles = null;
+
+ @Mock
+ private OpflexDomain mockDomain;
+ @Mock
+ private RpcServer mockServer;
+ @Mock
+ private OpflexConnectionService mockService;
+
+ @Override
+ public void addConnection(JsonRpcEndpoint endpoint) {
+ }
+
+ @Override
+ public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ roles = new ArrayList<Role>();
+ roles.add(Role.POLICY_REPOSITORY);
+
+ testServer =
+ new OpflexRpcServer(mockDomain, TEST_IDENTITY, roles);
+ testServer.setRpcBroker(mockService);
+ testServer.setConnectionService(mockService);
+
+ ts1 = new OpflexRpcServer(mockDomain, TEST_IDENTITY, roles);
+ ts2 = new OpflexRpcServer(mockDomain, TEST_IDENTITY2, roles);
+ roles = new ArrayList<Role>();
+ roles.add(Role.POLICY_ELEMENT);
+ ts3 = new OpflexRpcServer(mockDomain, TEST_IDENTITY2, roles);
+ when(mockDomain.getDomain()).thenReturn(TEST_DOMAIN);
+ }
+
+
+ @Test
+ public void testStart() throws Exception {
+ testServer.start();
+ assertTrue(testServer.getRpcServer() != null);
+ }
+
+ @Test
+ public void testSameServer() throws Exception {
+ assertTrue(testServer.sameServer(ts1));
+ assertFalse(testServer.sameServer(ts2));
+ assertFalse(testServer.sameServer(ts3));
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityRequest;
+import org.opendaylight.groupbasedpolicy.renderer.opflex.messages.IdentityResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class RoleTest {
+ protected static final Logger logger = LoggerFactory.getLogger(RoleTest.class);
+
+ private boolean idReq;
+ private boolean idRsp;
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @Test
+ public void testPolicyRepository() throws Exception {
+ idReq = false;
+ idRsp = false;
+
+ List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
+ for (RpcMessage msg : messages) {
+ if (msg instanceof IdentityRequest) {
+ idReq = true;
+ }
+ if (msg instanceof IdentityResponse) {
+ idRsp = true;
+ }
+ }
+ assertTrue(idReq == true);
+ assertTrue(idRsp == true);
+ }
+
+ @Test
+ public void testEndpointRegistry() throws Exception {
+ idReq = false;
+ idRsp = false;
+
+ List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
+ for (RpcMessage msg : messages) {
+ if (msg instanceof IdentityRequest) {
+ idReq = true;
+ }
+ if (msg instanceof IdentityResponse) {
+ idRsp = true;
+ }
+ }
+ assertTrue(idReq == true);
+ assertTrue(idRsp == true);
+ }
+
+ @Test
+ public void testObserver() throws Exception {
+ idReq = false;
+ idRsp = false;
+
+ List<RpcMessage> messages = Role.OBSERVER.getMessages();
+ for (RpcMessage msg : messages) {
+ if (msg instanceof IdentityRequest) {
+ idReq = true;
+ }
+ if (msg instanceof IdentityResponse) {
+ idRsp = true;
+ }
+ }
+ assertTrue(idReq == true);
+ assertTrue(idRsp == true);
+ }
+
+}
\ No newline at end of file