<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>2.6</version>
+ <configuration>
+ <formats>
+ <format>html</format>
+ <format>xml</format>
+ </formats>
+ </configuration>
</plugin>
</plugins>
</reporting>
package org.opendaylight.controller.config.yang.config.opflex_provider.impl;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.groupbasedpolicy.renderer.opflex.OpflexConnectionService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
public class OpflexProviderModule extends org.opendaylight.controller.config.yang.config.opflex_provider.impl.AbstractOpflexProviderModule {
public OpflexProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@Override
public java.lang.AutoCloseable createInstance() {
final OpflexConnectionService connectionService = new OpflexConnectionService();
- connectionService.setDataProvider(getDataBrokerDependency());
+ DataBroker dataBrokerService = getDataBrokerDependency();
+
+ connectionService.setDataProvider(dataBrokerService);
+ final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration =
+ dataBrokerService
+ .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
+ OpflexConnectionService.DISCOVERY_DEFINITIONS_IID,
+ connectionService, DataChangeScope.SUBTREE );
final class AutoCloseableConnectionService implements AutoCloseable {
@Override
public void close() throws Exception {
connectionService.stopping();
+ dataChangeListenerRegistration.close();
}
}
return new AutoCloseableConnectionService();
*
* Generated from: yang module name: opflex-provider-impl yang module local name: opflex-provider-impl
* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Wed Jun 11 17:14:35 UTC 2014
+* Generated at: Mon Jul 07 21:34:41 UTC 2014
*
* Do not modify this file unless it is present under src/main directory
*/
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.jsonrpc;
+
+
+/*
+ * An interface to provide notifications when connections are
+ * established or closed. The connection notifications
+ * use{@link RpcEncpoint} objects; as connections come and go,
+ * the {@link RpcEndpoint} objects associated with the connections
+ * can be long-lived
+ *
+ * @author tbachman
+ */
+public interface ConnectionService {
+ /**
+ *
+ * Indication that a new connections was established with
+ * the {@link JsonRpcEndpoint}
+ *
+ * @param endpoint The endpoint that added the connection.
+ */
+ public void addConnection(JsonRpcEndpoint endpoint);
+
+ /**
+ *
+ * Indication that a connections with the {@link JsonRpcEndpoint}
+ * was closed.
+ *
+ * @param endpoint The endpoint that closed the connection.
+ */
+ public void channelClosed(JsonRpcEndpoint endpoint) throws Exception;
+}
+++ /dev/null
-/*
- * Copyright (C) 2013 EBay Software Foundation
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Authors : Ashwin Raveendran, Madhu Venugopal
- */
-package org.opendaylight.groupbasedpolicy.jsonrpc;
-
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
-@JsonSerialize
-@JsonDeserialize
-public class JsonRpc10Request {
-
- String id;
- String method;
- JsonRpcMessage params;
-
- public JsonRpc10Request() {
- }
-
- public JsonRpc10Request(String id) {
- setId(id);
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getMethod() {
- return method;
- }
-
- public void setMethod(String method) {
- this.method = method;
- }
-
- public JsonRpcMessage getParams() {
- return this.params;
- }
-
- public void setParams(JsonRpcMessage params) {
- this.params = params;
- }
-
- @Override
- public String toString() {
- return "JsonRpc10Request [id=" + id + ", method=" + method
- + ", params=" + params + "]";
- }
-}
package org.opendaylight.groupbasedpolicy.jsonrpc;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
-public class JsonRpcEndpoint {
+/**
+ *
+ * This represents a JSONRPC connection between a {@link RpcServer}
+ * and some client. The clients may connect and disconnect, so one
+ * possible role that the JSONRPC endpoint can serve is to keep a long-lived
+ * notion of a client, while maintaining connectivity as it comes and goes.
+ *
+ * TODO: The current implementation uses Jackson Full data binding serialization,
+ * using JSON that has already been parsed using Jackson's Tree Model.
+ * This will be changed to streaming-mode serialization later.
+ *
+ * @author tbachman
+ *
+ */
+public class JsonRpcEndpoint implements ChannelFutureListener {
protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
- public class CallContext {
- String method;
- JsonRpc10Request request;
- SettableFuture<Object> future;
+ private class CallContext {
+ private String method;
+ private RpcMessage request;
+ private SettableFuture<Object> future;
- public CallContext(JsonRpc10Request request, String method, SettableFuture<Object> future) {
+ public CallContext(RpcMessage request, String method, SettableFuture<Object> future) {
this.method = method;
this.request = request;
this.future = future;
return method;
}
- public JsonRpc10Request getRequest() {
+ public RpcMessage getRequest() {
return request;
}
}
}
- ObjectMapper objectMapper;
- Channel nettyChannel;
- Map<String, CallContext> methodContext = Maps.newHashMap();
- JsonRpcMessageMap messageMap;
-
- public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel, JsonRpcMessageMap messageMap) {
+ private String identifier;
+ private ObjectMapper objectMapper;
+ private Channel nettyChannel;
+ private Map<String, CallContext> methodContext = Maps.newHashMap();
+ private RpcMessageMap messageMap;
+ private RpcBroker broker;
+ private ConnectionService connectionService;
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public ConnectionService getConnectionService() {
+ return connectionService;
+ }
+ public void setConnectionService(ConnectionService connectionService) {
+ this.connectionService = connectionService;
+ }
+
+ public Channel getChannel() {
+ return nettyChannel;
+ }
+
+ public boolean supportsMessages(List<RpcMessage> messages) {
+ return messageMap.containsMessages(messages);
+ }
+
+ public JsonRpcEndpoint(String identifier, ConnectionService connectionService,
+ ObjectMapper objectMapper, Channel channel,
+ RpcMessageMap messageMap, RpcBroker broker) {
+ this.identifier = identifier;
+ this.connectionService = connectionService;
this.objectMapper = objectMapper;
this.nettyChannel = channel;
this.messageMap = messageMap;
+ this.broker = broker;
}
- // This implementation will change -- modified port for testing only
- public SettableFuture<Object> invoke(JsonRpcMessage message) throws Throwable {
+ /**
+ *
+ * Send a concrete {@link RpcMessage} to the RPC endpoint.
+ *
+ * @param message The concrete {@link RpcMessage} to send
+ * @return SettableFuture<Object> The caller can use the returned
+ * object to wait for the response (currently no timeout)
+ * @throws Throwable The concrete message couldn't be serialized and sent
+ */
+ public SettableFuture<Object> sendRequest(RpcMessage message) throws Throwable {
if (messageMap.get(message.getName()) == null) {
return null;
}
- JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
- request.setMethod(message.getName());
- request.setParams(message);
+ message.setId(UUID.randomUUID().toString());
- String s = objectMapper.writeValueAsString(request);
- logger.trace("{}", s);
+ String s = objectMapper.writeValueAsString(message);
+ logger.trace("invoke: {}", s);
SettableFuture<Object> sf = SettableFuture.create();
- methodContext.put(request.getId(), new CallContext(request, message.getName(), sf));
+ methodContext.put(message.getId(), new CallContext(message, message.getName(), sf));
nettyChannel.writeAndFlush(s);
return sf;
}
- // This implementation will change -- modified port for testing only
+ /**
+ *
+ * Send a response to a previous {@link RpcMessage}request
+ *
+ * @param message The concrete {@link RpcMessage}
+ * @throws Throwable The concrete message couldn't be serialized and sent
+ */
+ public void sendResponse (RpcMessage message) throws Throwable {
+
+ String s = objectMapper.writeValueAsString(message);
+ logger.warn("sendResponse: {}", s);
+
+ nettyChannel.writeAndFlush(s);
+ }
+
+ /**
+ *
+ * Handle an {@link RpcMessage} response from the peer.
+ *
+ * @param response A fully parsed Jackson Tree-Mode JsonNode
+ * @throws NoSuchMethodException Internal error
+ */
public void processResult(JsonNode response) throws NoSuchMethodException {
logger.trace("Response : {}", response.toString());
CallContext returnCtxt = methodContext.get(response.get("id").asText());
if (returnCtxt == null) return;
- JsonRpcMessage message = messageMap.get(returnCtxt.getMethod());
+ RpcMessage message = messageMap.get(returnCtxt.getMethod());
if (message != null) {
try {
- JsonRpcMessage handler = objectMapper.treeToValue(response, message.getClass());
- handler.invoke();
+ RpcMessage handler = objectMapper.treeToValue(response, message.getClass());
JsonNode error = response.get("error");
if (error != null && !error.isNull()) {
logger.error("Error : {}", error.toString());
}
- returnCtxt.getFuture().set(handler);
+ returnCtxt.getFuture().set(handler);
} catch (JsonProcessingException e) {
logger.error("Unable to handle " + returnCtxt.getMethod(), e);
}
} else {
- throw new RuntimeException("donno how to deal with this");
+ throw new RuntimeException("The response to " + returnCtxt.getMethod() +
+ "sent is unsupported");
}
}
- // This implementation will change -- modified port for testing only
+ /**
+ *
+ * Handle incoming {@link RpcMessage} requests. The supported messages
+ * are defined by the endpoint's message map.
+ *
+ * @param requestJson A Jackson JsonNode that has had full Tree-Mode parsing
+ */
public void processRequest(JsonNode requestJson) {
- JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText());
- request.setMethod(requestJson.get("method").asText());
- logger.trace("Request : {} {}", requestJson.get("method"), requestJson.get("params"));
-
- JsonRpcMessage callback = messageMap.get(request.getMethod());
+ RpcMessage message;
+ RpcMessage callback = messageMap.get(requestJson.get("method").asText());
if (callback != null) {
try {
- JsonRpcMessage message = objectMapper.treeToValue(requestJson, callback.getClass());
- message.invoke();
+ logger.trace("Request : {} {}", requestJson.get("method"), requestJson.get("params"));
+
+ message = objectMapper.treeToValue(requestJson, callback.getClass());
+ message.setId(requestJson.get("id").asText());
+ //message.setMethod(requestJson.get("method").asText());
+
+ broker.publish(this, message);
} catch (JsonProcessingException e) {
- logger.error("Unable to invoke callback " + request.getMethod(), e);
+ logger.error("Unable to invoke callback " + callback.getName(), e);
}
return;
}
// Echo dont need any special processing. hence handling it internally.
- if (request.getMethod().equals("echo")) {
- JsonRpc10Response response = new JsonRpc10Response(request.getId());
+ if (requestJson.get("method").asText().equals("echo")) {
+ JsonRpc10Response response = new JsonRpc10Response(requestJson.get("id").asText());
response.setError(null);
String s = null;
try {
logger.error("No handler for Request : {}",requestJson.toString());
}
- public Map<String, CallContext> getMethodContext() {
- return methodContext;
+ @Override
+ public void operationComplete(ChannelFuture arg0) throws Exception {
+ connectionService.channelClosed(this);
}
}
+++ /dev/null
-/*
- * Copyright (C) 2014 Cisco Systems, Inc.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Authors : Thomas Bachman
- */
-package org.opendaylight.groupbasedpolicy.jsonrpc;
-
-public abstract class JsonRpcMessage {
-
- public abstract String getName();
- public abstract void setName(String name);
- public abstract void invoke();
-}
+++ /dev/null
-/*
- * Copyright (C) 2013 EBay Software Foundation
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Authors : Ashwin Raveendran
- */
-package org.opendaylight.groupbasedpolicy.jsonrpc;
-
-import java.util.List;
-
-public interface Params {
- List<Object> params();
-}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.jsonrpc;
+
+/**
+ * The {@link RpcBroker} provides a content-based pub/sub per RpcMessage
+ * type. This allows clients to register for the messages they are interested
+ * in.
+ *
+ * @author tbachman
+ */
+public interface RpcBroker {
+
+ /**
+ * The {@link RpcCallback} provides a callback interface for the
+ * {@link RpcBroker}. When the broker needs to publish a new
+ * {@link RpcMessage}, it invokes the callbacks that were
+ * registered for that message.
+ *
+ * @author tbachman
+ */
+ public interface RpcCallback {
+
+ /**
+ * Callback that's invoked when the {@link RpcMessage}
+ * request message is received
+ *
+ * @param endpoint The endpoint that received the messgae
+ * @param message The concrete {@RpcMessage} received
+ */
+ public void callback(JsonRpcEndpoint endpoint, RpcMessage message);
+
+ }
+
+ /**
+ *
+ * Subscribe to a concrete {@RpcMessage}
+ *
+ * @param message The concrete {@link RpcMessage} message to subscribe to
+ * @param callback The callback to invoke when the message is published
+ *
+ */
+ public void subscribe(RpcMessage message, RpcCallback callback);
+
+ /**
+ * Notification to call when a new {@link RpcMessage} request
+ * is received
+ *
+ * @param endpoint The endpoint that received this message
+ * @param message the concrete {@RpcMessage}
+ */
+ public void publish(JsonRpcEndpoint endpoint, RpcMessage message);
+
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.jsonrpc;
+
+/**
+ * The abstract {@link RpcMessage} is used for creating application
+ * specific RPC messages that can be used by the RPC library. The RPC
+ * library uses these for serialization and deserialization of messages.
+ *
+ * <p>The class provides notifiers for request and response messages,
+ * and provides for sending new requests.
+ *
+ * <p>The class should be used to store the
+ *
+ * @author tbachman
+ */
+public abstract class RpcMessage {
+
+ public abstract String getName();
+ public abstract void setName(String name);
+ public abstract String getId();
+ public abstract void setId(String id);
+ public abstract String getMethod();
+ public abstract void setMethod(String method);
+}
*/
package org.opendaylight.groupbasedpolicy.jsonrpc;
+import java.util.List;
import java.util.Map;
import com.google.common.collect.Maps;
-public class JsonRpcMessageMap {
- private Map<String, JsonRpcMessage> messageMap;
-
- public JsonRpcMessageMap() {
+public class RpcMessageMap {
+ private Map<String, RpcMessage> messageMap;
+
+ public boolean containsMessages (List<RpcMessage> messages) {
+ return messages.containsAll(messageMap.values());
+ }
+
+ public RpcMessageMap() {
messageMap = Maps.newHashMap();
}
-
- public JsonRpcMessage get(String messageName) {
+
+ public RpcMessage get(String messageName) {
return messageMap.get(messageName);
}
- public void add(JsonRpcMessage message) {
+ public void add(RpcMessage message) {
messageMap.put(message.getName(), message);
}
+
+ public void addList(List<RpcMessage> messages) {
+ for ( RpcMessage msg : messages ) {
+ this.add(msg);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.jsonrpc;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.CharsetUtil;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A (soon-to-be) generic RPC server. It creates {@link JsonRpcEndpoint} objects
+ * for each new connection. The RpcServer has a set of {@link RpcMessage}
+ * types that it supports, and it passes on these supported messages
+ * to the {@link JsonRpcEndpoint} objects that it creates.
+ *
+ * TODO: add serialization type, and refactor so serialization determines
+ * concrete RpcEndpoint object (only JsonRpcEndpoint right now).
+ * TODO: This and other classes are tightly coupled to netty -- make abstraction?
+ */
+public class RpcServer {
+ protected static final Logger logger =
+ LoggerFactory.getLogger(RpcServer.class);
+
+ String identity;
+ int listenPort;
+ Channel channel;
+ RpcMessageMap messageMap;
+ ConnectionService connectionService;
+ RpcBroker broker;
+
+ public RpcServer() {
+ messageMap = new RpcMessageMap();
+ }
+
+ public RpcServer(String identity, int port) {
+ messageMap = new RpcMessageMap();
+ this.listenPort = port;
+ this.identity = identity;
+ }
+
+ public void addMessage(RpcMessage message) {
+ this.messageMap.add(message);
+ }
+
+ public void addMessageList(List<RpcMessage> messageList) {
+ this.messageMap.addList(messageList);
+ }
+
+ public void setConnectionService(ConnectionService connectionService) {
+ this.connectionService = connectionService;
+ }
+
+ public void setRpcBroker(RpcBroker broker) {
+ this.broker = broker;
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+ public Channel getChannel() {
+ return this.channel;
+ }
+
+ private void handleNewConnection(String identifier, Channel channel)
+ throws InterruptedException, ExecutionException {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(
+ DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ JsonRpcEndpoint endpoint = new JsonRpcEndpoint(identifier, connectionService,
+ objectMapper, channel, messageMap, broker);
+ JsonRpcServiceBinderHandler binderHandler =
+ new JsonRpcServiceBinderHandler(endpoint);
+ channel.pipeline().addLast(binderHandler);
+
+ connectionService.addConnection(endpoint);
+
+ ChannelFuture closeFuture = channel.closeFuture();
+ closeFuture.addListener(endpoint);
+ }
+
+ public void start() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel)
+ throws Exception {
+ logger.debug("New Passive channel created : "
+ + channel.toString());
+ InetAddress address = channel.remoteAddress()
+ .getAddress();
+ int port = channel.remoteAddress().getPort();
+ String identifier = address.getHostAddress() + ":"
+ + port;
+ channel.pipeline().addLast(
+ new LoggingHandler(LogLevel.INFO),
+ new JsonRpcDecoder(100000),
+ new StringEncoder(CharsetUtil.UTF_8));
+
+ handleNewConnection(identifier, channel);
+ logger.warn("Connected Node : " + identifier);
+ }
+ });
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.RCVBUF_ALLOCATOR,
+ new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+ // Start the server.
+ ChannelFuture f = b.bind(identity, listenPort).sync();
+ String id = f.channel().localAddress().toString();
+ logger.warn("Connected Node : " + id);
+
+ this.channel = f.channel();
+
+ // Wait until the server socket is closed.
+ f.channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ logger.error("Thread interrupted", e);
+ } finally {
+ // Shut down all event loops to terminate all threads.
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (C) 2013 Red Hat, Inc.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Authors : Madhu Venugopal, Brent Salisbury, Thomas Bachman
- */
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
-public class ChannelConnectionHandler implements ChannelFutureListener {
- Connection peer;
- OpflexConnectionService connectionService;
- public Connection getPeer() {
- return peer;
- }
- public void setPeer(Connection peer) {
- this.peer = peer;
- }
- public OpflexConnectionService getConnectionService() {
- return connectionService;
- }
- public void setConnectionService(OpflexConnectionService connectionService) {
- this.connectionService = connectionService;
- }
- @Override
- public void operationComplete(ChannelFuture arg0) throws Exception {
- connectionService.channelClosed(peer);
- }
-}
+++ /dev/null
-/*
- * Copyright (C) 2013 Red Hat, Inc.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Authors : Madhu Venugopal, Brent Salisbury, Thomas Bachman
- */
-package org.opendaylight.groupbasedpolicy.renderer.opflex;
-
-import io.netty.channel.Channel;
-
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
-
-public class Connection {
- private String identifier;
- private Channel channel;
- private JsonRpcEndpoint endpoint;
-
- public Long getIdCounter() {
- return idCounter;
- }
-
- public void setIdCounter(Long idCounter) {
- this.idCounter = idCounter;
- }
-
- private Long idCounter;
-
- public Connection(String identifier, Channel channel) {
-
- super();
-
- this.identifier = identifier;
- this.channel = channel;
- this.idCounter = 0L;
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public void setIdentifier(String identifier) {
- this.identifier = identifier;
- }
-
- public Channel getChannel() {
- return this.channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- public void setEndpoint(JsonRpcEndpoint endpoint) {
- this.endpoint = endpoint;
- }
-
- public JsonRpcEndpoint getEndpoint() {
- return this.endpoint;
- }
-
-
- public void sendMessage(String message) {
- channel.writeAndFlush(message);
- this.idCounter++;
- }
-
- public void disconnect() {
- channel.close();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((identifier == null) ? 0 : identifier.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- Connection other = (Connection) obj;
- if (identifier == null) {
- if (other.identifier != null) return false;
- } else if (!identifier.equals(other.identifier)) return false;
- return true;
- }
-}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointDeclarationRequest extends RpcMessage {
+
+ public static final String DECLARATION_MESSAGE = "endpoint_declaration";
+
+ static public class Params {
+ private String subject;
+ private String context;
+ private String policy_name;
+ private String location;
+ private List<String> identifier;
+ private List<String> data;
+ private String status;
+ private int prr;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getPolicy_name() {
+ return policy_name;
+ }
+ public void setPolicy_name(String policy_name) {
+ this.policy_name = policy_name;
+ }
+ public String getLocation() {
+ return location;
+ }
+ public void setLocation(String location) {
+ this.location = location;
+ }
+ public List<String> getIdentifier() {
+ return identifier;
+ }
+ public void setIdentifier(List<String> identifier) {
+ this.identifier = identifier;
+ }
+ public List<String> getData() {
+ return data;
+ }
+ public void setData(List<String> data) {
+ this.data = data;
+ }
+ public String getStatus() {
+ return status;
+ }
+ public void setStatus(String status) {
+ this.status = status;
+ }
+ public int getPrr() {
+ return prr;
+ }
+ public void setPrr(int prr) {
+ this.prr = prr;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public EndpointDeclarationRequest(String name) {
+ this.name = name;
+ }
+
+ public EndpointDeclarationRequest() {
+ this.name = DECLARATION_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointDeclarationResponse extends RpcMessage {
+
+ public static final String ENDPOINT_MESSAGE_RESPONSE = "endpoint_declaration_response";
+
+ static public class Result {
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public EndpointDeclarationResponse(String name) {
+ this.name = name;
+ }
+
+ public EndpointDeclarationResponse() {
+ this.name = ENDPOINT_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointPolicyUpdateRequest extends RpcMessage {
+
+ public static final String EP_UPDATE_MESSAGE = "endpoint_update_policy";
+
+ static public class Params {
+ private String subject;
+ private String context;
+ private String policy_name;
+ private String location;
+ private List<String> identifier;
+ private List<String> data;
+ private String status;
+ private int ttl;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getPolicy_name() {
+ return policy_name;
+ }
+ public void setPolicy_name(String policy_name) {
+ this.policy_name = policy_name;
+ }
+ public String getLocation() {
+ return location;
+ }
+ public void setLocation(String location) {
+ this.location = location;
+ }
+ public List<String> getIdentifier() {
+ return identifier;
+ }
+ public void setIdentifier(List<String> identifier) {
+ this.identifier = identifier;
+ }
+ public List<String> getData() {
+ return data;
+ }
+ public void setData(List<String> data) {
+ this.data = data;
+ }
+ public String getStatus() {
+ return status;
+ }
+ public void setStatus(String status) {
+ this.status = status;
+ }
+ public int getTtl() {
+ return ttl;
+ }
+ public void setTtl(int ttl) {
+ this.ttl = ttl;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public EndpointPolicyUpdateRequest(String name) {
+ this.name = name;
+ }
+
+ public EndpointPolicyUpdateRequest() {
+ this.name = EP_UPDATE_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointPolicyUpdateResponse extends RpcMessage {
+
+ public static final String POLICY_UPDATE_MESSAGE_RESPONSE = "endpoint_update_policy_response";
+
+ static public class Result {
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public EndpointPolicyUpdateResponse(String name) {
+ this.name = name;
+ }
+
+ public EndpointPolicyUpdateResponse() {
+ this.name = POLICY_UPDATE_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointRequestRequest extends RpcMessage {
+
+ public static final String EP_REQUEST_MESSAGE = "endpoint_request";
+
+ static public class Params {
+ private String subject;
+ private String context;
+ private List<String> identifier;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public List<String> getIdentifier() {
+ return identifier;
+ }
+ public void setIdentifier(List<String> identifier) {
+ this.identifier = identifier;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public EndpointRequestRequest(String name) {
+ this.name = name;
+ }
+
+ public EndpointRequestRequest() {
+ this.name = EP_REQUEST_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class EndpointRequestResponse extends RpcMessage {
+
+ public static final String REQUEST_MESSAGE_RESPONSE = "endpoint_request_response";
+
+ static public class Endpoint {
+ private String subject;
+ private String context;
+ private String policy_name;
+ private String location;
+ private List<String> identifier;
+ private List<String> data;
+ private String status;
+ private int prr;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getPolicy_name() {
+ return policy_name;
+ }
+ public void setPolicy_name(String policy_name) {
+ this.policy_name = policy_name;
+ }
+ public String getLocation() {
+ return location;
+ }
+ public void setLocation(String location) {
+ this.location = location;
+ }
+ public List<String> getIdentifier() {
+ return identifier;
+ }
+ public void setIdentifier(List<String> identifier) {
+ this.identifier = identifier;
+ }
+ public List<String> getData() {
+ return data;
+ }
+ public void setData(List<String> data) {
+ this.data = data;
+ }
+ public String getStatus() {
+ return status;
+ }
+ public void setStatus(String status) {
+ this.status = status;
+ }
+ public int getPrr() {
+ return prr;
+ }
+ public void setPrr(int prr) {
+ this.prr = prr;
+ }
+ }
+
+ static public class Result {
+ List<Endpoint> endpoint;
+
+ public List<Endpoint> getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(List<Endpoint> endpoint) {
+ this.endpoint = endpoint;
+ }
+ }
+
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public EndpointRequestResponse(String name) {
+ this.name = name;
+ }
+
+ public EndpointRequestResponse() {
+ this.name = REQUEST_MESSAGE_RESPONSE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class IdentityRequest extends RpcMessage {
+
+ public static final String IDENTITY_MESSAGE = "send_identity";
+
+ static public class Params {
+ private String name;
+ private String domain;
+ private List<String> my_role;
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Params() {
+ my_role = new ArrayList<String>();
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public List<String> getMy_role() {
+ return my_role;
+ }
+
+ public void setMy_role(List<String> my_role) {
+ this.my_role = my_role;
+ }
+
+
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public IdentityRequest(String name) {
+ this.name = name;
+ }
+
+ public IdentityRequest() {
+ this.name = IDENTITY_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class IdentityResponse extends RpcMessage {
+
+ public static final String IDENTITY_MESSAGE_RESPONSE = "send_identity_response";
+
+ static public class Peer {
+ private String role;
+ private String connectivity_info;
+
+ public String getRole() {
+ return role;
+ }
+
+ public void setRole(String role) {
+ this.role = role;
+ }
+
+ public String getConnectivity_info() {
+ return connectivity_info;
+ }
+
+ public void setConnectivity_info(String connectivity_info) {
+ this.connectivity_info = connectivity_info;
+ }
+
+ public Peer() {
+ }
+ }
+
+ static public class Result {
+ private String name;
+ private String domain;
+ private List<String> my_role;
+ private List<Peer> peers;
+
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Result() {
+ my_role = new ArrayList<String>();
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public List<String> getMy_role() {
+ return my_role;
+ }
+
+ public void setMy_role(List<String> my_role) {
+ this.my_role = my_role;
+ }
+
+ public List<Peer> getPeers() {
+ return peers;
+ }
+
+ public void setPeers(List<Peer> peers) {
+ this.peers = peers;
+ }
+
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public IdentityResponse(String name) {
+ this.name = name;
+ }
+
+ public IdentityResponse() {
+ this.name = IDENTITY_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
*/
package org.opendaylight.groupbasedpolicy.renderer.opflex;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.string.StringEncoder;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.CharsetUtil;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcMessageMap;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
-import org.opendaylight.groupbasedpolicy.renderer.opflex.ChannelConnectionHandler;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcServer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
/*
- * Defines the JSON RPC methods supported, and creates
- * the connection/discovery server
+ * Manages the different OpFlex entity connections. It does this
+ * on behalf of each logical OpFlex entity:
+ * o Policy Repositories
+ * o Endpoint Registries
+ * o Observers
+ *
+ * Each OpFlex entity defines the JSON RPC methods supported, and
+ * manages their connection/discovery using dedicated servers.
+ * Servers and connections are maintained in dedicated client and
+ * server maps.
+ *
+ * TODO: calls to add messages to policy repository, EP registry , and observer
+ * TODO: incorporate OpFlex domain
+ * TODO: break into smaller pieces?
*/
-public class OpflexConnectionService {
- protected static final Logger logger = LoggerFactory.getLogger(OpflexConnectionService.class);
+public class OpflexConnectionService
+ implements ConnectionService, RpcBroker,
+ RpcBroker.RpcCallback, DataChangeListener, AutoCloseable {
+ protected static final Logger logger =
+ LoggerFactory.getLogger(OpflexConnectionService.class);
+
+ public enum Role {
+ POLICY_REPOSITORY("policy_repository"),
+ ENDPOINT_REGISTRY("endpoint_registry"),
+ OBSERVER("observer"),
+ POLICY_ELEMENT("policy_element");
+
+ private String role;
+ Role(String role) {
+ this.role = role;
+ }
+ @Override
+ public String toString() {
+ return this.role;
+ }
+ }
+
+ private class OpflexConnection {
+ String identity;
+ List<Role> roles;
+ JsonRpcEndpoint endpoint;
+
+ public OpflexConnection() {
+ }
+
+ public String getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(String identity) {
+ this.identity = identity;
+ }
+
+ public List<Role> getRoles() {
+ return roles;
+ }
+
+ public void setRoles(List<Role> roles) {
+ this.roles = roles;
+ }
+
+ public JsonRpcEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(JsonRpcEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ }
+
+ public class OpflexRpcServer {
+ private String identity;
+ private List<Role> roles;
+ private RpcServer server;
+
+ public OpflexRpcServer() {
+ roles = new ArrayList<Role>();
+ }
+
+ public OpflexRpcServer(String identity) {
+ this.identity = identity;
+ }
+
+ public OpflexRpcServer(String identity, List<Role> roles) {
+ this.identity = identity;
+ this.roles = roles;
+ }
+
+ public String getId() {
+ return this.identity;
+ }
+
+ public void setRpcServer(RpcServer server) {
+ this.server = server;
+ }
+
+ public RpcServer getRpcServer() {
+ return this.server;
+ }
+
+ public void addRole(Role role) {
+ if (!this.roles.contains(role))
+ this.roles.add(role);
+ }
+
+ public List<Role> getRoles() {
+ return this.roles;
+ }
+
+ public boolean sameServer(OpflexRpcServer srv) {
+ if (this == srv)
+ return true;
+ if (srv == null)
+ return false;
+ if (!this.identity.equals(srv.identity))
+ return false;
+ if (this.roles == null && srv.roles == null)
+ return true;
+ if (this.roles == null || srv.roles == null)
+ return false;
+ if (this.roles.size() == srv.roles.size() && this.roles.containsAll(srv.roles))
+ return true;
+ return false;
+ }
+ }
// Properties that can be set in config.ini
- private static final String OPFLEX_LISTENPORT = "opflex.listenPort";
+ static final String OPFLEX_LISTENPORT = "opflex.listenPort";
private static final Integer defaultOpflexPort = 6670;
+ static final String OPFLEX_LISTENIP = "opflex.listenIp";
+ private static final String defaultOpflexIp = "0.0.0.0";
private static Integer opflexListenPort = defaultOpflexPort;
- private ConcurrentMap<String, Connection> opflexConnections;
- private Channel serverListenChannel = null;
- private DataBrokerService dataProvider;
- private JsonRpcMessageMap messageMap;
-
- public void setDataProvider(DataBrokerService salDataProvider) {
- // TODO: use this for OpFlex configuration data changes
- this.dataProvider = salDataProvider;
-
- // TODO: Get configuration for various servers,
- // to be sent in OpFlex Identity message
- start();
- }
-
- public void start() {
- opflexConnections = new ConcurrentHashMap<String, Connection>();
+ private static String opflexListenIp = defaultOpflexIp;
+
+ ConcurrentMap<String, OpflexConnection> opflexAgents = null;
+ ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
+ ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
+ List<RpcMessage> policyRepositoryMessages;
+ List<RpcMessage> endpointRegistryMessages;
+ List<RpcMessage> observerMessages;
+ private DataBroker dataProvider;
+
+ public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_DEFINITIONS_IID =
+ InstanceIdentifier.builder(DiscoveryDefinitions.class).build();
+
+
+ /**
+ *
+ * Set the data store provider for the OpFlex connection service, and
+ * start the service using this data store.
+ *
+ * @param salDataProvider The MD-SAl data store provider
+ */
+ public void setDataProvider(DataBroker salDataProvider) {
+ dataProvider = salDataProvider;
+
+ startOpflexManager();
+ }
+
+ private DiscoveryDefinitions getDiscoveryDefinitions() {
+
+ ReadTransaction t = dataProvider.newReadOnlyTransaction();
+ ListenableFuture<Optional<DataObject>> f = t.read(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
+ try {
+ Optional<DataObject> dao = f.get();
+ if (dao.get() != null && dao.get() instanceof DiscoveryDefinitions) {
+ return (DiscoveryDefinitions)dao.get();
+ }
+ }
+ catch ( Exception e ) {
+ logger.warn("Not sure what happens here");
+ }
+ return null;
+
+ }
+
+ private List<OpflexRpcServer> setDefaultIdentities() {
+
+ /*
+ * Create a single server, filling all roles
+ */
+ String identity = opflexListenIp + ":" + opflexListenPort.toString();
+ List<OpflexRpcServer> srvList = new ArrayList<OpflexRpcServer>();
+ List<Role> roles = new ArrayList<Role>();
+ roles.add(Role.POLICY_REPOSITORY);
+ roles.add(Role.ENDPOINT_REGISTRY);
+ roles.add(Role.OBSERVER);
+ OpflexRpcServer srv = new OpflexRpcServer(identity, roles);
+ srvList.add(srv);
+ return srvList;
+
+ }
+
+ private List<OpflexRpcServer> createServerList() {
+ DiscoveryDefinitions identities = getDiscoveryDefinitions();
+ if (identities != null) {
+ Map<String, OpflexRpcServer> servers =
+ new ConcurrentHashMap<String, OpflexRpcServer>();
+ List<String> addList = getPolicyRepositories(identities.getPolicyRepository());
+ addServerList(servers, addList, Role.POLICY_REPOSITORY);
+ addList = getEndpointRegistries(identities.getEndpointRegistry());
+ addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
+ addList = getObservers(identities.getObserver());
+ addServerList(servers, addList, Role.OBSERVER);
+ return(new ArrayList<OpflexRpcServer>(servers.values()));
+ }
+ else {
+ return setDefaultIdentities();
+ }
+ }
+
+ private void initializeServers() {
+
+ /*
+ * Get the configured identities, if any. If lists are empty,
+ * set up a single instance of each, using the localhost
+ * interface
+ */
+ List<OpflexRpcServer> serverList = createServerList();
+ addServers(serverList);
+ }
+
+
+ private List<String> getPolicyRepositories(List<PolicyRepository> repositories ) {
+ List<String> identityList = new ArrayList<String>() ;
+ for ( PolicyRepository pr: repositories.toArray(new PolicyRepository[0]) ) {
+ String identity = pr.getId() + ":" + pr.getPort().toString();
+ identityList.add(identity);
+ }
+ return identityList;
+ }
+
+ private List<String> getEndpointRegistries(List<EndpointRegistry> registries ) {
+ List<String> identityList = new ArrayList<String>() ;
+ for ( EndpointRegistry epr: registries.toArray(new EndpointRegistry[0]) ) {
+ String identity = epr.getId() + ":" + epr.getPort().toString();
+ identityList.add(identity);
+ }
+ return identityList;
+ }
+
+ private List<String> getObservers(List<Observer> observers ) {
+ List<String> identityList = new ArrayList<String>() ;
+ for ( Observer o: observers.toArray(new Observer[0]) ) {
+ String identity = o.getId() + ":" + o.getPort().toString();
+ identityList.add(identity);
+ }
+ return identityList;
+ }
+
+ private void addServerList( Map<String, OpflexRpcServer> servers,
+ List<String> idList, Role role ) {
+ if (idList == null || idList.size() <= 0)
+ return;
+
+ for ( String id : idList ) {
+ List<Role> roles = new ArrayList<Role>();
+ OpflexRpcServer srv = servers.get(id);
+ if (srv != null ) {
+ roles = srv.getRoles();
+ servers.remove(id);
+ }
+
+ roles.add(role);
+ srv = new OpflexRpcServer(id, roles);
+ servers.put(id, srv);
+ }
+
+ }
+
+ private void launchRpcServer(OpflexRpcServer srv) {
+ RpcServer rpcSrv = new RpcServer(srv.getId().split(":")[0],
+ Integer.parseInt(srv.getId().split(":")[1]));
+ rpcSrv.setConnectionService(this);
+ rpcSrv.setRpcBroker(this);
+
+ /*
+ * Make sure the server is configured for the proper messages
+ */
+ List<Role> roles = srv.getRoles();
+ for ( Role role : roles ) {
+ switch (role) {
+ case POLICY_REPOSITORY:
+ {
+ rpcSrv.addMessageList(this.policyRepositoryMessages);
+ }
+ break;
+ case ENDPOINT_REGISTRY:
+ {
+ rpcSrv.addMessageList(this.endpointRegistryMessages);
+ }
+ break;
+ case OBSERVER:
+ {
+ rpcSrv.addMessageList(this.observerMessages);
+ }
+ break;
+ default:
+ {
+ logger.warn("Invalid Role {}", role );
+ }
+ break;
+ }
+ }
+
+ srv.setRpcServer(rpcSrv);
+ opflexServers.put(srv.getId(), srv);
+
+ new Thread() {
+ private RpcServer server;
+
+ public Thread initializeServerParams(RpcServer server) {
+ this.server = server;
+ return this;
+ }
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ logger.warn("Exception starting new server {}", e);
+ }
+ }
+ }.initializeServerParams(rpcSrv).start();
+
+ }
+
+ private void addServers(List<OpflexRpcServer> idMap) {
+ /*
+ * Check to see if there's already a server
+ * with this identity, and if so, close it
+ * and replace it with this one.
+ */
+ for ( OpflexRpcServer srv: idMap ) {
+ OpflexRpcServer server = opflexServers.get(srv.getId());
+ if (server != null) {
+ if ( !server.sameServer(srv)) {
+ OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
+ oldServer.getRpcServer().getChannel().disconnect();
+ launchRpcServer(srv);
+ }
+ }
+ else {
+ launchRpcServer(srv);
+ }
+ }
+ }
+
+ private void dropServers(List<String> oldServers) {
+ OpflexRpcServer server;
+
+ /*
+ * Check to see if there's already a server
+ * with this identity, and if so, close it
+ * and replace it with this one.
+ */
+ for (String identity: oldServers) {
+ if (opflexServers.containsKey(identity)) {
+ server = opflexServers.remove(identity);
+ server.getRpcServer().getChannel().disconnect();
+ }
+ }
+ }
+
+ public void startOpflexManager() {
+ opflexAgents = new ConcurrentHashMap<String, OpflexConnection>();
+ opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
+ brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
+
+ /*
+ * Check configuration to see which listeners we should be creating
+ */
int listenPort = defaultOpflexPort;
String portString = System.getProperty(OPFLEX_LISTENPORT);
if (portString != null) {
listenPort = Integer.decode(portString).intValue();
}
opflexListenPort = listenPort;
- this.messageMap = new JsonRpcMessageMap();
- // TODO: Add supported OpFlex messages/methods to map
-
- startOpflexManager();
+ String listenIp = defaultOpflexIp;
+ String ipString = System.getProperty(OPFLEX_LISTENIP);
+ if (ipString != null) {
+ listenIp = ipString;
+ }
+ opflexListenIp = listenIp;
+
+ /*
+ * Set up the messages supported by each OpFlex policy
+ * component
+ */
+ policyRepositoryMessages = new ArrayList<RpcMessage>();
+ endpointRegistryMessages = new ArrayList<RpcMessage>();
+ observerMessages = new ArrayList<RpcMessage>();
+
+ IdentityRequest idRequest = new IdentityRequest();
+ policyRepositoryMessages.add(idRequest);
+ endpointRegistryMessages.add(idRequest);
+ observerMessages.add(idRequest);
+
+ /* this class implements identity handlers */
+ subscribe(idRequest, this);
+
+ IdentityResponse idResponse = new IdentityResponse();
+ policyRepositoryMessages.add(idResponse);
+ endpointRegistryMessages.add(idResponse);
+ observerMessages.add(idResponse);
+
+ initializeServers();
}
-
+
/**
- * Function called by the dependency manager before the services exported by
- * the component are unregistered, this will be followed by a "destroy ()"
- * calls
+ * Stop the OpFlex Connection Service. This shuts down all active
+ * connections and servers.
*/
public void stopping() {
- for (Connection connection : opflexConnections.values()) {
- connection.disconnect();
+ for (OpflexConnection connection : opflexAgents.values()) {
+ connection.getEndpoint().getChannel().disconnect();
+ }
+ for (OpflexRpcServer server : opflexServers.values() ) {
+ if (server.getRpcServer().getChannel() != null) {
+ server.getRpcServer().getChannel().disconnect();
+ }
}
- serverListenChannel.disconnect();
}
- // TODO: Add sending of Identity message to peer
- private void handleNewConnection(String identifier, Channel channel, OpflexConnectionService instance) throws InterruptedException, ExecutionException {
- Connection connection = new Connection(identifier, channel);
+ /**
+ * Remove the OpFlex connection/agent from the map
+ *
+ * @param identifier The identity of the connection that was closed
+ */
+ public void removeConnection(String identifier) {
+ opflexAgents.remove(identifier);
+ }
+
+ /**
+ * Add a server with the given identity
+ *
+ * @param identity The IP address/socket pair for the server
+ * @param server The instantiated server
+ */
+ public void addServer(String identity, OpflexRpcServer server) {
+ opflexServers.put(identity, server);
+ }
+
+ /**
+ * Implemented from the AutoCloseable interface.
+ */
+ @Override
+ public void close() throws ExecutionException, InterruptedException {
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ if (dataProvider != null) {
+ WriteTransaction t = dataProvider.newWriteOnlyTransaction();
+ t.delete(LogicalDatastoreType.CONFIGURATION, DISCOVERY_DEFINITIONS_IID);
+ t.commit().get();
+ }
+ }
- JsonRpcEndpoint endpoint = new JsonRpcEndpoint(objectMapper, channel, messageMap);
- JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(endpoint);
- channel.pipeline().addLast(binderHandler);
+ @Override
+ public void onDataChanged( final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject>change) {
- connection.setEndpoint(endpoint);
- opflexConnections.put(identifier, connection);
+ List<String> addList = new ArrayList<String>();
+ List <String> dropList = new ArrayList<String>();
- ChannelConnectionHandler handler = new ChannelConnectionHandler();
- handler.setPeer(connection);
- handler.setConnectionService(this);
- ChannelFuture closeFuture = channel.closeFuture();
- closeFuture.addListener(handler);
- // Keeping the Initial inventory update(s) on its own thread.
- new Thread() {
- Connection connection;
- String identifier;
+ /* Get the new list of configured servers */
+ List<OpflexRpcServer> serverList = createServerList();
- @Override
- public void run() {
- try {
- initializeInventoryForNewNode(connection);
- } catch (InterruptedException | ExecutionException e) {
- logger.error("Failed to initialize inventory for node with identifier " + identifier, e);
- opflexConnections.remove(identifier);
- }
- }
- public Thread initializeConnectionParams(String identifier, Connection connection) {
- this.identifier = identifier;
- this.connection = connection;
- return this;
+ /*
+ * Create a list of new servers by skipping any servers in the
+ * list that are already configured (i.e. same IP/socket and set
+ * of roles) -- no need to take them down
+ */
+ for ( OpflexRpcServer srv : serverList ) {
+ OpflexRpcServer s = opflexServers.get(srv.getId());
+ if (s != null && s.getRoles().containsAll(srv.getRoles())) {
+ continue;
}
- }.initializeConnectionParams(identifier, connection).start();
+ addList.add(srv.getId());
+ }
+
+ /*
+ * We need to find out if there are any servers that
+ * we have to drop. This is the set of servers that
+ * are already running but don't appear in the configured
+ * list. This just requires a check against the IP/port
+ * (i.e. no need to check role).
+ */
+ Set <String> dropSet = opflexServers.keySet();
+ dropSet.removeAll(addList);
+ dropList.addAll(dropSet);
+
+ /* remove deleted servers first */
+ dropServers(dropList);
+ addServers(serverList);
}
- public void channelClosed(Connection peer) throws Exception {
- logger.info("Connection to Node : {} closed", peer);
- this.opflexConnections.remove(peer);
+ @Override
+ public void subscribe(RpcMessage message, RpcCallback callback) {
+
+ /*
+ * Create a new list, replacing the old
+ */
+ List<RpcCallback> cbList = brokerMap.get(message.getName());
+ if(cbList == null) {
+ cbList = new ArrayList<RpcCallback>();
+ cbList.add(callback);
+ brokerMap.put(message.getName(), cbList);
+ }
+ else
+ if(!cbList.contains(callback)) {
+ cbList.add(callback);
+ brokerMap.replace(message.getName(), cbList);
+ }
}
- private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
- Channel channel = connection.getChannel();
- InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
- int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
- // TODO: Left-over place holder... should we keep this information around?
+ @Override
+ public void publish(JsonRpcEndpoint endpoint, RpcMessage message) {
+ List <RpcCallback> cbList = brokerMap.get(message.getName());
+ if (cbList == null) {
+ System.out.println("Unhandled Message name is " + message.getName());
+ return;
+ }
+
+ for (RpcCallback cb : cbList ) {
+ cb.callback(endpoint, message);
+ }
}
- private void startOpflexManager() {
- new Thread() {
- @Override
- public void run() {
- opflexManager();
+ @Override
+ public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
+
+ IdentityResponse.Result result = new IdentityResponse.Result();
+
+ List<IdentityResponse.Peer> peers =
+ new ArrayList<IdentityResponse.Peer>();
+
+ IdentityResponse response = new IdentityResponse();
+
+ /*
+ * We find our role by matching the parent Channel (couldn't
+ * come up with an easier way to do this, as we're trying to
+ * match against the configured identity -- decided against
+ * using the channel's connection b/c things like wildcard
+ * addresses make this comparison tricky). There's also a
+ * minute possibility that the parent socket has been deleted
+ * (e.g. due to reconfiguration) in which case, the peers list
+ * will provide the updated information.
+ */
+ OpflexRpcServer srv = null;
+ List<String> myRoles = new ArrayList<String>();
+ List<OpflexRpcServer> servers =
+ new ArrayList<OpflexRpcServer>(opflexServers.values());
+ for (OpflexRpcServer server : servers) {
+ if (server.getRpcServer().getChannel() == endpoint.getChannel().parent()) {
+ /* this is our server */
+ List<Role> roles = server.getRoles();
+ if (roles != null) {
+ for ( Role r : roles ) {
+ myRoles.add(r.toString());
+ }
+ }
+ srv = server;
+ break;
}
- }.start();
- }
+ }
+ result.setMy_role(myRoles);
+
+ /*
+ * The peers field contains the identifiers other than my_role
+ */
+ for (OpflexRpcServer server : servers) {
+ /* Skip our server -- reported in my_role */
+ if ( server.getId() == srv.getId())
+ continue;
+ List<Role> roles = server.getRoles();
+ if (roles != null) {
+ for ( Role r : roles ) {
+ IdentityResponse.Peer peer = new IdentityResponse.Peer();
+ peer.setConnectivity_info(server.getId());
+ peer.setRole(r.toString());
+ peers.add(peer);
+ }
+ }
+ }
+ result.setPeers(peers);
+ response.setResult(result);
- private void opflexManager() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
+ response.setId(message.getId());
+
+ /*
+ * Collect the set of severs and send in the response
+ */
try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- logger.debug("New Passive channel created : "+ channel.toString());
- InetAddress address = channel.remoteAddress().getAddress();
- int port = channel.remoteAddress().getPort();
- String identifier = address.getHostAddress()+":"+port;
- channel.pipeline().addLast(
- new LoggingHandler(LogLevel.INFO),
- new JsonRpcDecoder(100000),
- new StringEncoder(CharsetUtil.UTF_8));
-
- handleNewConnection(identifier, channel, OpflexConnectionService.this);
- logger.debug("Connected Node : "+identifier);
- }
- });
- b.option(ChannelOption.TCP_NODELAY, true);
- b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
- // Start the server.
- ChannelFuture f = b.bind(opflexListenPort).sync();
- serverListenChannel = f.channel();
- // Wait until the server socket is closed.
- serverListenChannel.closeFuture().sync();
- } catch (InterruptedException e) {
- logger.error("Thread interrupted", e);
- } finally {
- // Shut down all event loops to terminate all threads.
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
+ endpoint.sendResponse(response);
+ }
+ catch ( Throwable e ) {
+ logger.error("Throwable for sending {}, {}", message, e);
}
}
+
+ @Override
+ public void addConnection(JsonRpcEndpoint endpoint) {
+ List<Role> roles = new ArrayList<Role>();
+ OpflexConnection agent = new OpflexConnection();
+ agent.setEndpoint(endpoint);
+ agent.setIdentity(endpoint.getIdentifier());
+
+ if (endpoint.supportsMessages(policyRepositoryMessages)) {
+ roles.add(Role.POLICY_REPOSITORY);
+ }
+ if (endpoint.supportsMessages(endpointRegistryMessages)) {
+ roles.add(Role.ENDPOINT_REGISTRY);
+ }
+ if (endpoint.supportsMessages(observerMessages)) {
+ roles.add(Role.OBSERVER);
+ }
+ agent.setRoles(roles);
+ logger.warn("Adding agent {}", endpoint.getIdentifier());
+ opflexAgents.put(endpoint.getIdentifier(), agent);
+ }
+
+ @Override
+ public void channelClosed(JsonRpcEndpoint peer) throws Exception {
+ logger.info("Connection to Node : {} closed", peer.getIdentifier());
+ opflexAgents.remove(peer.getIdentifier());
+ }
+
}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyResolutionRequest extends RpcMessage {
+
+ public static final String RESOLVE_MESSAGE = "resolve_policy";
+
+ static public class Params {
+ private String subject;
+ private String context;
+ private String policy_name;
+ private String on_behalf_of; // TODO: Make URI type
+ private String data;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getPolicy_name() {
+ return policy_name;
+ }
+ public void setPolicy_name(String policy_name) {
+ this.policy_name = policy_name;
+ }
+ public String getOn_behalf_of() {
+ return on_behalf_of;
+ }
+ public void setOn_behalf_of(String on_behalf_of) {
+ this.on_behalf_of = on_behalf_of;
+ }
+ public String getData() {
+ return data;
+ }
+ public void setData(String data) {
+ this.data = data;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public PolicyResolutionRequest(String name) {
+ this.name = name;
+ }
+
+ public PolicyResolutionRequest() {
+ this.name = RESOLVE_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyResolutionResponse extends RpcMessage {
+
+ public static final String POLICY_MESSAGE_RESPONSE = "resolve_policy_response";
+
+ static public class Result {
+ private List<String> policy; // TODO: replace with MOs
+ private int prr;
+ public List<String> getPolicy() {
+ return policy;
+ }
+ public void setPolicy(List<String> policy) {
+ this.policy = policy;
+ }
+ public int getPrr() {
+ return prr;
+ }
+ public void setPrr(int prr) {
+ this.prr = prr;
+ }
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public PolicyResolutionResponse(String name) {
+ this.name = name;
+ }
+
+ public PolicyResolutionResponse() {
+ this.name = POLICY_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyTriggerRequest extends RpcMessage {
+
+ public static final String TRIGGER_MESSAGE = "trigger_policy";
+
+ static public class Params {
+ private String policy_type;
+ private String context;
+ private String policy_name;
+ private int prr;
+ public String getPolicy_type() {
+ return policy_type;
+ }
+ public void setPolicy_type(String policy_type) {
+ this.policy_type = policy_type;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getPolicy_name() {
+ return policy_name;
+ }
+ public void setPolicy_name(String policy_name) {
+ this.policy_name = policy_name;
+ }
+ public int getPrr() {
+ return prr;
+ }
+ public void setPrr(int prr) {
+ this.prr = prr;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public PolicyTriggerRequest(String name) {
+ this.name = name;
+ }
+
+ public PolicyTriggerRequest() {
+ this.name = TRIGGER_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyTriggerResponse extends RpcMessage {
+
+ public static final String TRIGGER_MESSAGE_RESPONSE = "trigger_policy_response";
+
+ static public class Result {
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public PolicyTriggerResponse(String name) {
+ this.name = name;
+ }
+
+ public PolicyTriggerResponse() {
+ this.name = TRIGGER_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyUpdateRequest extends RpcMessage {
+
+ public static final String UPDATE_MESSAGE = "update_policy";
+
+ static public class Params {
+ private String context;
+ private List<String> subtree;
+ private int prr;
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public List<String> getSubtree() {
+ return subtree;
+ }
+ public void setSubtree(List<String> subtree) {
+ this.subtree = subtree;
+ }
+ public int getPrr() {
+ return prr;
+ }
+ public void setPrr(int prr) {
+ this.prr = prr;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public PolicyUpdateRequest(String name) {
+ this.name = name;
+ }
+
+ public PolicyUpdateRequest() {
+ this.name = UPDATE_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class PolicyUpdateResponse extends RpcMessage {
+
+ public static final String UPDATE_MESSAGE_RESPONSE = "update_policy_response";
+
+ static public class Result {
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public PolicyUpdateResponse(String name) {
+ this.name = name;
+ }
+
+ public PolicyUpdateResponse() {
+ this.name = UPDATE_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import java.util.List;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class StateReportRequest extends RpcMessage {
+
+ public static final String STATE_MESSAGE = "report_state";
+
+ static public class Params {
+ private String subject;
+ private String context;
+ private String object; // TODO: change to MOs
+ private List<String> fault;
+ private List<String> event;
+ private List<String> statistics;
+ private List<String> health;
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getContext() {
+ return context;
+ }
+ public void setContext(String context) {
+ this.context = context;
+ }
+ public String getObject() {
+ return object;
+ }
+ public void setObject(String object) {
+ this.object = object;
+ }
+ public List<String> getFault() {
+ return fault;
+ }
+ public void setFault(List<String> fault) {
+ this.fault = fault;
+ }
+ public List<String> getEvent() {
+ return event;
+ }
+ public void setEvent(List<String> event) {
+ this.event = event;
+ }
+ public List<String> getStatistics() {
+ return statistics;
+ }
+ public void setStatistics(List<String> statistics) {
+ this.statistics = statistics;
+ }
+ public List<String> getHealth() {
+ return health;
+ }
+ public void setHealth(List<String> health) {
+ this.health = health;
+ }
+ }
+ private String id;
+ private String method;
+ private List<Params> params;
+
+ @JsonIgnore
+ private String name;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public List<Params> getParams() {
+ return this.params;
+ }
+
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public StateReportRequest(String name) {
+ this.name = name;
+ }
+
+ public StateReportRequest() {
+ this.name = STATE_MESSAGE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+@JsonDeserialize
+public class StateReportResponse extends RpcMessage {
+
+ public static final String REPORT_MESSAGE_RESPONSE = "trigger_policy_response";
+
+ static public class Result {
+ }
+ static public class Error {
+ private String message;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+
+ private String id;
+ private Result result;
+ private Error error;
+
+ @JsonIgnore
+ private String name;
+ @JsonIgnore
+ private String method;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Error getError() {
+ return error;
+ }
+
+ public void setError(Error error) {
+ this.error = error;
+ }
+
+ @Override
+ public String getMethod() {
+ return null;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public StateReportResponse(String name) {
+ this.name = name;
+ }
+
+ public StateReportResponse() {
+ this.name = REPORT_MESSAGE_RESPONSE;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
uses config:service-ref {
refine type {
mandatory true;
- config:required-identity mdsal:binding-data-broker;
+ config:required-identity mdsal:binding-async-data-broker;
}
}
}
"The nodes that any OpFlex agent needs to communicate
with in an OpFlex policy fabric.";
- config true;
list policy-repository {
description
"A repository that the OpFlex agent can use for resolving
leaf id {
description
- "The id for the policy repository.";
- type inet:host;
- mandatory true;
+ "The id for the policy repository.";
+ type string;
+ config true;
}
leaf port {
description
- "The port number to use for the connection";
- type inet:port-number;
- mandatory true;
+ "The port number to use for the connection";
+ type int32;
+ config true;
}
leaf serialization-type {
description
"The serialization to use for this connection.";
type serialization;
- mandatory true;
+ config true;
}
}
leaf id {
description
- "The id for the endpoint registry.";
- type inet:host;
- mandatory true;
+ "The id for the endpoint registry.";
+ type string;
+ config true;
}
leaf port {
description
- "The port number to use for the connection";
- type inet:port-number;
- mandatory true;
+ "The port number to use for the connection";
+ type int32;
+ config true;
}
leaf serialization-type {
description
"The serialization to use for this connection.";
type serialization;
- mandatory true;
+ config true;
}
}
leaf id {
description
- "The id for the Observer.";
- type inet:host;
- mandatory true;
+ "The id for the Observer.";
+ type string;
+ config true;
}
leaf port {
description
- "The port number to use for the connection";
- type inet:port-number;
- mandatory true;
+ "The port number to use for the connection";
+ type int32;
+ config true;
}
leaf serialization-type {
description
"The serialization to use for this connection.";
type serialization;
- mandatory true;
+ config true;
}
}
}
package org.opendaylight.groupbasedpolicy.jsonrpc;
import static io.netty.buffer.Unpooled.copiedBuffer;
-
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import io.netty.channel.embedded.EmbeddedChannel;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.JsonNode;
-
import io.netty.util.CharsetUtil;
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcMessage;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcMessageMap;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JsonRpcEndpointTest {
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class JsonRpcEndpointTest implements RpcBroker, RpcBroker.RpcCallback {
protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
- static final String TEST_JSON_CLASS_NAME = "policy_update";
+ static final String TEST_JSON_CLASS_NAME = "send_identity";
// Used for message generation, single property
static final String simpleMessage = "{\"otherstuff\": \"foobar\"}";
// Used for testing valid incoming JSONRPC request messages
- static final String testRequest =
+ static final String testRequest =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"method\":" + "\"" + TEST_JSON_CLASS_NAME + "\",\"params\":null}";
// Used for testing invalid incoming JSONRPC request messages
- static final String testBadRequest =
+ static final String testBadRequest =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"method\":\"foobar\",\"params\":[]}";
// Used for testing valid incoming JSONRPC echo request messages
- static final String testEchoRequest =
+ static final String testEchoRequest =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"method\":\"echo\",\"params\":[]}";
// Used for testing invalid incoming JSONRPC response messages
- static final String unknownResponse =
+ static final String unknownResponse =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"result\":\"foobar\",\"error\":null}";
+ static final String opflexIdentityRequest =
+ "{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
+ "\"method\":" + "\"" + TEST_JSON_CLASS_NAME + "\",\"params\": [ {" +
+ "\"name\": \"will\", \"domain\": \"robinson\"," +
+ "\"my_role\": [\"policy_element\", \"policy_repository\"]} ] }";
private JsonRpcDecoder decoder;
private EmbeddedChannel channel;
private JsonRpcEndpoint endpoint;
- private JsonRpcMessageMap messageMap;
+ private RpcMessageMap messageMap;
private static boolean testTriggerFlag;
+ @Override
+ public void subscribe(RpcMessage message, RpcCallback callback) {
+ }
+
+ @Override
+ public void publish(JsonRpcEndpoint endpoint, RpcMessage message) {
+ testTriggerFlag = true;
+ callback(endpoint, message);
+ }
+
+ @JsonDeserialize
+ static final public class Params {
+ private String name;
+ private String domain;
+ private List<String> my_role;
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getDomain() {
+ return domain;
+ }
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+ public List<String> getMy_role() {
+ return my_role;
+ }
+ public void setMy_role(List<String> my_role) {
+ this.my_role = my_role;
+ }
+ public Params() {
+ my_role = new ArrayList<String>();
+ }
+ }
+
@JsonDeserialize
- static final class OpflexTest extends JsonRpcMessage {
+ static final class OpflexTest extends RpcMessage {
+
+ private String id;
+ private String method;
+
+ private List<Params> params;
private String otherstuff;
@JsonIgnore
private String name;
public OpflexTest() {
+ this.name = TEST_JSON_CLASS_NAME;
}
public void setOtherstuff ( String otherstuff ) {
return this.otherstuff;
}
+ public void setParams(List<Params> params) {
+ this.params = params;
+ }
+
+ public List<Params> getParams() {
+ return params;
+ }
+
@Override
public String getName() {
return this.name;
}
- @Override
+ @Override
public void setName(String name) {
this.name = name;
}
@Override
- public void invoke() {
- testTriggerFlag = true;
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public void setMethod(String method) {
+ this.method = method;
}
}
-
+
+ @Override
+ public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
+
+ if (message != null && message instanceof JsonRpcEndpointTest.OpflexTest) {
+ JsonRpcEndpointTest.OpflexTest msg = (JsonRpcEndpointTest.OpflexTest)message;
+ if ( msg.getParams() == null) {
+ return;
+ }
+ List<String> roles = msg.params.get(0).getMy_role();
+ }
+ }
+
+
@Before
public void setUp() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
/*
* Create the message map, populating with just our test message
*/
- messageMap = new JsonRpcMessageMap();
- JsonRpcEndpointTest.OpflexTest rpcMethod =
+ messageMap = new RpcMessageMap();
+ JsonRpcEndpointTest.OpflexTest rpcMethod =
new JsonRpcEndpointTest.OpflexTest();
rpcMethod.setName(TEST_JSON_CLASS_NAME);
messageMap.add(rpcMethod);
decoder = new JsonRpcDecoder(1000);
- JsonRpcServiceBinderHandler binderHandler =
+ JsonRpcServiceBinderHandler binderHandler =
new JsonRpcServiceBinderHandler(null);
channel = new EmbeddedChannel(decoder, binderHandler);
-
- endpoint = new JsonRpcEndpoint(objectMapper, channel, messageMap);
+
+ endpoint = new JsonRpcEndpoint(channel.localAddress().toString(), null,
+ objectMapper, channel, messageMap, this);
binderHandler.setEndpoint(endpoint);
}
readValue(simpleMessage, JsonRpcEndpointTest.OpflexTest.class);
testRpc.setName(TEST_JSON_CLASS_NAME);
try {
- endpoint.invoke(testRpc);
+ endpoint.sendRequest(testRpc);
Object result = channel.readOutbound();
assertTrue(result != null);
assertTrue(result.toString().contains("id"));
JsonRpcEndpointTest.OpflexTest testRpc = objectMapper.
readValue(simpleMessage, JsonRpcEndpointTest.OpflexTest.class);
testRpc.setName(TEST_JSON_CLASS_NAME);
-
+
try {
- endpoint.invoke(testRpc);
+ ListenableFuture<Object> lf = endpoint.sendRequest(testRpc);
String result = channel.readOutbound().toString();
JsonNode node = objectMapper.readValue(result, JsonNode.class);
String idValue = node.path("id").textValue();
- String foo = "{ \"id\":\"" + idValue +
+ String foo = "{ \"id\":\"" + idValue +
"\",\"result\":\"foobar\",\"error\":null}";
testTriggerFlag = false;
channel.writeInbound(copiedBuffer(foo, CharsetUtil.UTF_8));
- assertTrue(testTriggerFlag);
+ Object tmp = lf.get();
+ assertTrue(tmp instanceof JsonRpcEndpointTest.OpflexTest);
channel.finish();
} catch ( Throwable e ) {
fail();
- }
+ }
}
@Test
public void testInboundEchoRequest() throws Exception {
- testTriggerFlag = false;
channel.writeInbound(copiedBuffer(testEchoRequest, CharsetUtil.UTF_8));
Object result = channel.readOutbound();
assertTrue(result != null);
assertTrue(result.toString().contains("result"));
assertTrue(result.toString().contains("error"));
channel.finish();
- }
-}
+ }
+
+ @Test
+ public void testOpflexIdentityRequest() throws Exception {
+ testTriggerFlag = false;
+ System.out.println("OpflexIdentity Test");
+ channel.writeInbound(copiedBuffer(opflexIdentityRequest, CharsetUtil.UTF_8));
+ channel.finish();
+ assertTrue(testTriggerFlag);
+ }
+}
package org.opendaylight.groupbasedpolicy.jsonrpc;
import static io.netty.buffer.Unpooled.copiedBuffer;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import io.netty.channel.embedded.EmbeddedChannel;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
import io.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+
public class JsonRpcServiceBinderHandlerTest {
protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
// Used for testing incoming JSONRPC request messages
- static final String testRequest =
+ static final String testRequest =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"method\": \"test_foo\",\"params\":null}";
// Used for testing incoming JSONRPC response messages
- static final String testResponse =
+ static final String testResponse =
"{ \"id\":\"2da9e3d7-0bbe-4099-b343-12783777452f\"," +
"\"result\":\"foobar\",\"error\":null}";
-
+
private JsonRpcEndpoint mockEndpoint;
private JsonRpcServiceBinderHandler binderHandler;
private JsonRpcDecoder decoder;
private EmbeddedChannel channel;
-
+
@Before
public void setUp() throws Exception {
-
+
mockEndpoint = mock(JsonRpcEndpoint.class);
decoder = new JsonRpcDecoder(1000);
binderHandler = new JsonRpcServiceBinderHandler(mockEndpoint);
verify(mockEndpoint).processRequest((JsonNode)anyObject());
}
- //@Test
+ @Test
public void testResponse() throws Exception {
channel.writeInbound(copiedBuffer(testResponse, CharsetUtil.UTF_8));
channel.finish();
}
-}
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.jsonrpc;
+
+
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcServerTest implements ConnectionService, RpcBroker {
+ protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
+
+ private static final String TEST_IP = "127.0.0.1";
+ private static final int TEST_PORT = 53670;
+ private static boolean newConnection = false;
+ private static boolean serverClosed = false;
+
+ @Override
+ public void addConnection(JsonRpcEndpoint endpoint) {
+ }
+
+ @Override
+ public void channelClosed(JsonRpcEndpoint peer) throws Exception {
+ }
+
+ @Override
+ public void publish(JsonRpcEndpoint endpoint, RpcMessage message) {
+ }
+
+ @Override
+ public void subscribe(RpcMessage message, RpcCallback callback) {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ RpcServer server = new RpcServer(TEST_IP, TEST_PORT);
+ server.setConnectionService(this);
+ server.setRpcBroker(this);
+
+ }
+
+
+ //@Test
+ public void testStartup() throws Exception {
+
+ }
+
+ //@Test
+ public void testShutdown() throws Exception {
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static io.netty.buffer.Unpooled.copiedBuffer;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.CharsetUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcDecoder;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
+import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcServiceBinderHandler;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Test the serialization and deserialization of RPC Messages,
+ * and check against expected structure and values.
+ */
+public class OpflexConnectionServiceTest {
+ protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+
+ static private final String TEST_RPC_MESSAGE_NAME = "test_message";
+
+ static private final String TEST_EP_UUID = "85d53c32-47af-4eaf-82fd-ced653ff74da";
+ static private final String TEST_ID_UUID = "788950f6-2279-4ae1-820e-d277cea3623c";
+ static public final String TEST_IP = "127.0.0.1";
+ static public final String TEST_PORT = "57563";
+
+ static private final String ID_UUID = "2da9e3d7-0bbe-4099-b343-12783777452f";
+ static private final String SEND_IDENTITY = "send_identity";
+ static private final String POLICY_REQUEST = "resolve_policy";
+ static private final String DOMAIN_UUID = "75caaff2-cb4f-4509-b45e-47b447cb35a9";
+ static private final String NAME = "vm1";
+ static private final String IDENTITY = "192.168.0.1:56732";
+ static private final String opflexIdentityRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + SEND_IDENTITY + "\"," +
+ " \"params\": [ {" +
+ " \"name\": \"" + NAME + "\"," +
+ " \"domain\": \"" + DOMAIN_UUID + "\"," +
+ " \"my_role\": [\"" + OpflexConnectionService.Role.POLICY_ELEMENT.toString() + "\"]" +
+ " }] }";
+
+ @Mock
+ private DataBroker mockDataBroker;
+ private DiscoveryDefinitionsBuilder discoveryBuilder;
+ private EndpointRegistryBuilder eprBuilder;
+ private PolicyRepositoryBuilder prBuilder;
+ private ObserverBuilder oBuilder;
+ private DiscoveryDefinitions dummyDefinitions;
+ private List<EndpointRegistry> registries;
+ private List<PolicyRepository> repositories;
+ private List<Observer> observers;
+ private OpflexConnectionService opflexService;
+ @Mock
+ private EmbeddedChannel mockChannel;
+ @Mock
+ private JsonRpcEndpoint mockEp;
+ private JsonRpcDecoder decoder;
+ @Mock
+ private ReadOnlyTransaction mockRead;
+ @Mock
+ private ListenableFuture<Optional<DataObject>> mockOption;
+ @Mock
+ private Optional<DataObject> mockDao;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ /*
+ * Mocks
+ */
+ when(mockDataBroker.newReadOnlyTransaction()).thenReturn(mockRead);
+ when(mockRead.read(LogicalDatastoreType.CONFIGURATION, OpflexConnectionService.
+ DISCOVERY_DEFINITIONS_IID)).thenReturn(mockOption);
+ when(mockOption.get()).thenReturn(mockDao);
+ when(mockDao.get()).thenReturn(dummyDefinitions);
+
+ /*
+ * Builders for creating our own discovery definitions
+ */
+ discoveryBuilder = new DiscoveryDefinitionsBuilder();
+ eprBuilder = new EndpointRegistryBuilder();
+ prBuilder = new PolicyRepositoryBuilder();
+ oBuilder = new ObserverBuilder();
+
+
+ // TODO: needs deterministic way of finding available socket
+ System.setProperty(OpflexConnectionService.OPFLEX_LISTENPORT, TEST_PORT);
+ System.setProperty(OpflexConnectionService.OPFLEX_LISTENIP, TEST_IP);
+ }
+
+
+ @Test
+ public void testNoDefinitions() throws Exception {
+
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+ verify(mockDataBroker).newReadOnlyTransaction();
+ }
+
+ @Test
+ public void testInitialSet() throws Exception {
+ registries = new ArrayList<EndpointRegistry>();
+ repositories = new ArrayList<PolicyRepository>();
+ observers = new ArrayList<Observer>();
+ EndpointRegistry epr = eprBuilder.setId(TEST_IP)
+ .setPort(Integer.valueOf(TEST_PORT)).build();
+ PolicyRepository pr = prBuilder.setId(TEST_IP)
+ .setPort(Integer.valueOf(TEST_PORT)).build();
+ Observer o = oBuilder.setId(TEST_IP)
+ .setPort(Integer.valueOf(TEST_PORT)).build();
+ registries.add(epr);
+ repositories.add(pr);
+ observers.add(o);
+ dummyDefinitions = discoveryBuilder.setObserver(observers)
+ .setEndpointRegistry(registries)
+ .setPolicyRepository(repositories).build();
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+ verify(mockDataBroker).newReadOnlyTransaction();
+
+ }
+
+ @Test
+ public void testAddConnection() throws Exception {
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+
+ when(mockEp.supportsMessages(opflexService.
+ policyRepositoryMessages)).thenReturn(true);
+ when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
+
+
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+ opflexService.addConnection(mockEp);
+ verify(mockEp, Mockito.times(3)).supportsMessages(opflexService.policyRepositoryMessages);
+ verify(mockEp, Mockito.times(3)).getIdentifier();
+ assertTrue(opflexService.opflexAgents.size() == 1);
+ }
+
+ @Test
+ public void testChannelClosed() throws Exception {
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+
+ JsonRpcEndpoint mockEp = mock(JsonRpcEndpoint.class);
+
+ when(mockEp.supportsMessages(opflexService.
+ policyRepositoryMessages)).thenReturn(true);
+ when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
+
+
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+ opflexService.addConnection(mockEp);
+ assertTrue(opflexService.opflexAgents.size() == 1);
+ opflexService.channelClosed(mockEp);
+ assertTrue(opflexService.opflexAgents.size() == 0);
+ }
+
+ @Test
+ public void testPublishSubscribeCallback() throws Exception {
+
+ /*
+ * This is *far* from UT, but worthwhile for now
+ */
+ opflexService = new OpflexConnectionService();
+ opflexService.setDataProvider(mockDataBroker);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ decoder = new JsonRpcDecoder(1000);
+ JsonRpcServiceBinderHandler binderHandler =
+ new JsonRpcServiceBinderHandler(null);
+ EmbeddedChannel channel = new EmbeddedChannel(decoder, binderHandler);
+
+ RpcMessageMap messageMap = new RpcMessageMap();
+ IdentityRequest rpcMsg = new IdentityRequest();
+ messageMap.add(rpcMsg);
+ JsonRpcEndpoint ep = new JsonRpcEndpoint(IDENTITY , opflexService,
+ objectMapper, channel, messageMap, opflexService);
+ binderHandler.setEndpoint(ep);
+ opflexService.addConnection(ep);
+ channel.writeInbound(copiedBuffer(opflexIdentityRequest, CharsetUtil.UTF_8));
+ Object result = channel.readOutbound();
+ assertTrue(result != null);
+ IdentityResponse resp = objectMapper.readValue(result.toString(), IdentityResponse.class);
+ assertTrue(result != null);
+ assertTrue(resp.getResult().getMy_role()
+ .contains(OpflexConnectionService.Role.ENDPOINT_REGISTRY.toString()));
+ assertTrue(resp.getResult().getMy_role()
+ .contains(OpflexConnectionService.Role.POLICY_REPOSITORY.toString()));
+ assertTrue(resp.getResult().getMy_role()
+ .contains(OpflexConnectionService.Role.OBSERVER.toString()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (C) 2014 Cisco Systems, Inc.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Thomas Bachman
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.opflex;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ *
+ * Test the serialization and deserialization of RPC Messages,
+ * and check against expected structure and values.
+ */
+public class OpflexMessageTest {
+ protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
+
+ private enum Role {
+ POLICY_REPOSITORY("policy_repository"),
+ ENDPOINT_REGISTRY("endpoint_registry"),
+ OBSERVER("observer"),
+ POLICY_ELEMENT("policy_element");
+
+ private String role;
+ Role(String role) {
+ this.role = role;
+ }
+ @Override
+ public String toString() {
+ return this.role;
+ }
+ }
+
+ private static final String ID_UUID = "2da9e3d7-0bbe-4099-b343-12783777452f";
+ private static final String SEND_IDENTITY = "send_identity";
+ private static final String POLICY_REQUEST = "resolve_policy";
+ private static final String DOMAIN_UUID = "75caaff2-cb4f-4509-b45e-47b447cb35a9";
+ private static final String NAME = "vm1";
+ private static final String IDENTITY = "192.168.0.1:56732";
+ private static final String opflexIdentityRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + SEND_IDENTITY + "\"," +
+ " \"params\": [ {" +
+ " \"name\": \"" + NAME + "\"," +
+ " \"domain\": \"" + DOMAIN_UUID + "\"," +
+ " \"my_role\": [\"" + Role.POLICY_ELEMENT.toString() + "\"]" +
+ " }] }";
+
+ private static final String opflexIdentityResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {" +
+ " \"name\": \"" + NAME + "\"," +
+ " \"domain\": \"" + DOMAIN_UUID + "\"," +
+ " \"my_role\": [\"" + Role.POLICY_REPOSITORY.toString() + "\"]," +
+ " \"peers\": [" +
+ " { \"role\": \"" + Role.ENDPOINT_REGISTRY.toString() + "\"," +
+ " \"connectivity_info\": \"" + IDENTITY + "\"}," +
+ " { \"role\": \"" + Role.OBSERVER.toString() + "\"," +
+ " \"connectivity_info\": \"" + IDENTITY + "\"}" +
+ " ]" +
+ " }}";
+
+ public static final String SUBJECT = "webContract";
+ public static final String CONTEXT = "353786fd-7327-41dd-b7de-5d672e303730";
+ public static final String POLICY_NAME = "webFarmEpg";
+ public static final String URI = "ef130684-ac17-4118-ad36-8dea0babc7b2";
+ public static final String DATA = "condition:notAuthorized";
+ public static final String PRR = "100";
+
+ private static final String opflexPolicyRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + POLICY_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"policy_name\": \"" + POLICY_NAME + "\"," +
+ " \"on_behalf_of\": \"" + URI + "\"," +
+ " \"data\": \"" + DATA + "\"" +
+ " }] }";
+
+ private static final String opflexPolicyResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {" +
+ " \"policy\": [ \"" + POLICY_NAME + "\"]," +
+ " \"prr\": \"" + PRR + "\"" +
+ " }}";
+
+ private static final String opflexUpdateRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + POLICY_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"subtree\": [\"" + POLICY_NAME + "\"]," +
+ " \"prr\": \"" + PRR + "\"" +
+ " }] }";
+
+ private static final String opflexUpdateResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {}}";
+
+ private static final String TRIGGER_REQUEST = "trigger_policy";
+ private static final String TYPE = "someType";
+
+ private static final String opflexTriggerRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + TRIGGER_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"policy_type\": \"" + TYPE + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"policy_name\": \"" + POLICY_NAME + "\"," +
+ " \"prr\": \"" + PRR + "\"" +
+ " }] }";
+
+
+ private static final String opflexTriggerResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {} }";
+
+ private static final String EP_DECLARE_REQUEST = "endpoint_declaration";
+ private static final String LOCATION = "sw3/p12";
+ private static final String EP_ID_UUID = "d90173aa-621a-4009-b5da-c930cce0918f";
+ private static final String STATUS = "attach";
+
+ private static final String opflexEpDeclareRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + EP_DECLARE_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"policy_name\": \"" + POLICY_NAME + "\"," +
+ " \"location\": \"" + LOCATION + "\"," +
+ " \"identifier\": [\"" + EP_ID_UUID + "\"]," +
+ " \"data\": [\"" + DATA + "\"]," +
+ " \"status\": \"" + STATUS + "\"," +
+ " \"prr\": \"" + PRR + "\"" +
+ " }] }";
+
+ private static final String opflexEpDeclareResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {} }";
+
+ private static final String ENDPOINT_REQUEST = "endpoint_request";
+ private static final String opflexEpRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + ENDPOINT_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"identifier\": [\"" + EP_ID_UUID + "\"]" +
+ " }] }";
+
+ private static final String opflexEpResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {" +
+ " \"endpoint\": [{" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"policy_name\": \"" + POLICY_NAME + "\"," +
+ " \"location\": \"" + LOCATION + "\"," +
+ " \"identifier\": [\"" + EP_ID_UUID + "\"]," +
+ " \"data\": [\"" + DATA + "\"]," +
+ " \"status\": \"" + STATUS + "\"," +
+ " \"prr\": \"" + PRR + "\"" +
+ " }]" +
+ " }}";
+
+ private static final String EP_POLICY_UDPATE_REQUEST = "endpoint_update_policy";
+ private static final String opflexEpPolicyUpdateRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + EP_POLICY_UDPATE_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"policy_name\": \"" + POLICY_NAME + "\"," +
+ " \"location\": \"" + LOCATION + "\"," +
+ " \"identifier\": [\"" + EP_ID_UUID + "\"]," +
+ " \"data\": [\"" + DATA + "\"]," +
+ " \"status\": \"" + STATUS + "\"," +
+ " \"ttl\": \"" + PRR + "\"" +
+ " }] }";
+
+
+ private static final String opflexEpPolicyUpdateResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {} }";
+
+ private static final String STATE_REQUEST = "report_state";
+ private static final String OBJECT = "ep101";
+ private static final String FAULT = "ep102";
+ private static final String EVENT = "infected";
+ private static final String STATISTICS = "rxPackets: 20";
+ private static final String HEALTH = ".98";
+ private static final String opflexStateRequest =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"method\": \"" + STATE_REQUEST + "\"," +
+ " \"params\": [ {" +
+ " \"subject\": \"" + SUBJECT + "\"," +
+ " \"context\": \"" + CONTEXT + "\"," +
+ " \"object\": \"" + OBJECT + "\"," +
+ " \"fault\": [\"" + FAULT + "\"]," +
+ " \"event\": [\"" + EVENT + "\"]," +
+ " \"statistics\": [\"" + STATISTICS + "\"]," +
+ " \"health\": [\"" + HEALTH + "\"]" +
+ " }] }";
+
+
+
+ private static final String opflexStateResponse =
+ "{ \"id\": \"" + ID_UUID + "\"," +
+ " \"error\": {}," +
+ " \"result\": {} }";
+
+ @Before
+ public void setUp() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+
+ @Test
+ public void testIdentityRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexIdentityRequest, IdentityRequest.class);
+ assertTrue(rpcMsg instanceof IdentityRequest);
+ IdentityRequest opflexRequest = (IdentityRequest)rpcMsg;
+ assertTrue(opflexRequest.getId().equals(ID_UUID));
+ assertTrue(opflexRequest.getMethod().equals(SEND_IDENTITY));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getDomain().equals(DOMAIN_UUID));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getName().equals(NAME));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getMy_role().get(0).equals(Role.POLICY_ELEMENT.toString()));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getName().equals(NAME));
+ }
+
+ @Test
+ public void testIdentityResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexIdentityResponse, IdentityResponse.class);
+ assertTrue(rpcMsg instanceof IdentityResponse);
+ IdentityResponse opflexResponse = (IdentityResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getResult()
+ .getDomain().equals(DOMAIN_UUID));
+ assertTrue(opflexResponse.getResult()
+ .getName().equals(NAME));
+ assertTrue(opflexResponse.getResult()
+ .getMy_role().get(0).equals(Role.POLICY_REPOSITORY.toString()));
+ assertTrue(opflexResponse.getResult()
+ .getPeers().get(0).getRole().equals(Role.ENDPOINT_REGISTRY.toString()));
+ assertTrue(opflexResponse.getResult()
+ .getPeers().get(1).getRole().equals(Role.OBSERVER.toString()));
+ }
+
+ @Test
+ public void testPolicyRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexPolicyRequest, PolicyResolutionRequest.class);
+ assertTrue(rpcMsg instanceof PolicyResolutionRequest);
+ PolicyResolutionRequest opflexRequest = (PolicyResolutionRequest)rpcMsg;
+ assertTrue(opflexRequest.getId().equals(ID_UUID));
+ assertTrue(opflexRequest.getMethod().equals(POLICY_REQUEST));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getOn_behalf_of().equals(URI));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getPolicy_name().equals(POLICY_NAME));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexRequest.getParams()
+ .get(0).getData().equals(DATA));
+
+ }
+
+ @Test
+ public void testPolicyResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexPolicyResponse, PolicyResolutionResponse.class);
+ assertTrue(rpcMsg instanceof PolicyResolutionResponse);
+ PolicyResolutionResponse opflexResponse = (PolicyResolutionResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getResult()
+ .getPolicy().get(0).equals(POLICY_NAME));
+ assertTrue(opflexResponse.getResult()
+ .getPrr() == Integer.parseInt(PRR));
+ }
+
+ @Test
+ public void testUpdateRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexUpdateRequest, PolicyUpdateRequest.class);
+ assertTrue(rpcMsg instanceof PolicyUpdateRequest);
+ PolicyUpdateRequest opflexResponse = (PolicyUpdateRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getSubtree().get(0).equals(POLICY_NAME));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPrr() == Integer.parseInt(PRR));
+ }
+
+ @Test
+ public void testUpdateResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexUpdateResponse, PolicyUpdateResponse.class);
+ assertTrue(rpcMsg instanceof PolicyUpdateResponse);
+ PolicyUpdateResponse opflexResponse = (PolicyUpdateResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ logger.warn("Result is {}", opflexResponse.getResult().toString());
+ }
+
+ @Test
+ public void testTriggerRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexTriggerRequest, PolicyTriggerRequest.class);
+ assertTrue(rpcMsg instanceof PolicyTriggerRequest);
+ PolicyTriggerRequest opflexResponse = (PolicyTriggerRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPolicy_name().equals(POLICY_NAME));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPolicy_type().equals(TYPE));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPrr() == Integer.parseInt(PRR));
+ }
+
+
+ @Test
+ public void testTriggerResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexTriggerResponse, PolicyTriggerResponse.class);
+ assertTrue(rpcMsg instanceof PolicyTriggerResponse);
+ PolicyTriggerResponse opflexResponse = (PolicyTriggerResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ }
+
+
+ @Test
+ public void testEpDeclareRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpDeclareRequest, EndpointDeclarationRequest.class);
+ assertTrue(rpcMsg instanceof EndpointDeclarationRequest);
+ EndpointDeclarationRequest opflexResponse = (EndpointDeclarationRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPolicy_name().equals(POLICY_NAME));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getLocation().equals(LOCATION));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getIdentifier().get(0).equals(EP_ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getData().get(0).equals(DATA));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getStatus()
+ .equals(STATUS));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPrr() == Integer.parseInt(PRR));
+ }
+
+ @Test
+ public void testEpDeclareResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpDeclareResponse, EndpointDeclarationResponse.class);
+ assertTrue(rpcMsg instanceof EndpointDeclarationResponse);
+ EndpointDeclarationResponse opflexResponse = (EndpointDeclarationResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ }
+
+
+ @Test
+ public void testEpRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpRequest, EndpointRequestRequest.class);
+ assertTrue(rpcMsg instanceof EndpointRequestRequest);
+ EndpointRequestRequest opflexResponse = (EndpointRequestRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getIdentifier().get(0).equals(EP_ID_UUID));
+ }
+
+
+ @Test
+ public void testEpResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpResponse, EndpointRequestResponse.class);
+ assertTrue(rpcMsg instanceof EndpointRequestResponse);
+ EndpointRequestResponse opflexResponse = (EndpointRequestResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getPolicy_name().equals(POLICY_NAME));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getLocation().equals(LOCATION));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getIdentifier().get(0).equals(EP_ID_UUID));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getData().get(0).equals(DATA));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getStatus().equals(STATUS));
+ assertTrue(opflexResponse.getResult()
+ .getEndpoint().get(0).getPrr() == Integer.parseInt(PRR));
+
+ }
+
+ @Test
+ public void testEpPolicyUpdateRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpPolicyUpdateRequest, EndpointPolicyUpdateRequest.class);
+ assertTrue(rpcMsg instanceof EndpointPolicyUpdateRequest);
+ EndpointPolicyUpdateRequest opflexResponse = (EndpointPolicyUpdateRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getPolicy_name().equals(POLICY_NAME));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getLocation().equals(LOCATION));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getIdentifier().get(0).equals(EP_ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getData().get(0).equals(DATA));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getStatus()
+ .equals(STATUS));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getTtl() == Integer.parseInt(PRR));
+ }
+
+
+ @Test
+ public void testEpPolicyUpdateResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpPolicyUpdateResponse, EndpointPolicyUpdateResponse.class);
+ assertTrue(rpcMsg instanceof EndpointPolicyUpdateResponse);
+ EndpointPolicyUpdateResponse opflexResponse = (EndpointPolicyUpdateResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ }
+
+
+ @Test
+ public void testStateRequest() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexStateRequest, StateReportRequest.class);
+ assertTrue(rpcMsg instanceof StateReportRequest);
+ StateReportRequest opflexResponse = (StateReportRequest)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getSubject().equals(SUBJECT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getContext().equals(CONTEXT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getObject().equals(OBJECT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getFault().get(0).equals(FAULT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getEvent().get(0).equals(EVENT));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getStatistics().get(0).equals(STATISTICS));
+ assertTrue(opflexResponse.getParams()
+ .get(0).getHealth().get(0).equals(HEALTH));
+ }
+
+ @Test
+ public void testStateResponse() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ RpcMessage rpcMsg = objectMapper.
+ readValue(opflexEpPolicyUpdateResponse, StateReportResponse.class);
+ assertTrue(rpcMsg instanceof StateReportResponse);
+ StateReportResponse opflexResponse = (StateReportResponse)rpcMsg;
+ assertTrue(opflexResponse.getId().equals(ID_UUID));
+ }
+}