Change-Id: I0d3fcc67eb2e4dada0c0daf954e42e638e555577
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
import org.osgi.framework.BundleContext;
public class Activator extends AbstractProvider {
import org.osgi.framework.BundleContext;
public class Activator extends AbstractProvider {
-
- ZeroMqRpcRouter router;
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- router = ZeroMqRpcRouter.getInstance();
- router.setBrokerSession(session);
- router.start();
- }
-
- @Override
- protected void stopImpl(BundleContext context) {
- router.stop();
- }
+
+ ZeroMqRpcRouter router;
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ router = ZeroMqRpcRouter.getInstance();
+ router.setBrokerSession(session);
+ router.start();
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ router.stop();
+ }
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
/**
import org.zeromq.ZMQ;
/**
private String pubIp = System.getProperty("pub.ip"); // other controller's ip
private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
private String pubIp = System.getProperty("pub.ip"); // other controller's ip
private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
+ private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
+
//Prevent instantiation
private ZeroMqRpcRouter() {
}
//Prevent instantiation
private ZeroMqRpcRouter() {
}
context = ZMQ.context(2);
publisher = context.socket(ZMQ.PUB);
int ret = publisher.bind("tcp://*:" + pubPort);
context = ZMQ.context(2);
publisher = context.socket(ZMQ.PUB);
int ret = publisher.bind("tcp://*:" + pubPort);
- System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
// serverPool = Executors.newSingleThreadExecutor();
serverPool = Executors.newCachedThreadPool();
handlersPool = Executors.newCachedThreadPool();
// serverPool = Executors.newSingleThreadExecutor();
serverPool = Executors.newCachedThreadPool();
handlersPool = Executors.newCachedThreadPool();
// Bind to publishing controller
subscriber = context.socket(ZMQ.SUB);
// Bind to publishing controller
subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://" + pubIp + ":" + subPort);
- System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
- + pubIp + ":" + subPort + "]");
+ String pubAddress = "tcp://" + pubIp + ":" + subPort;
+ subscriber.connect(pubAddress);
+ _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
//subscribe for announcements
//TODO: Message type would be changed. Update this
//subscribe for announcements
//TODO: Message type would be changed. Update this
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(replySocket, ZMQ.Poller.POLLIN);
poller.register(subscriber, ZMQ.Poller.POLLIN);
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(replySocket, ZMQ.Poller.POLLIN);
poller.register(subscriber, ZMQ.Poller.POLLIN);
- System.out.println(Thread.currentThread().getName() + "-Start Polling");
//TODO: Add code to restart the thread after exception
while (!Thread.currentThread().isInterrupted()) {
//TODO: Add code to restart the thread after exception
while (!Thread.currentThread().isInterrupted()) {
* @throws ClassNotFoundException
*/
private void handleAnnouncement() throws IOException, ClassNotFoundException {
* @throws ClassNotFoundException
*/
private void handleAnnouncement() throws IOException, ClassNotFoundException {
- System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
+
+ _logger.info("Announcement received");
Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
- System.out.println("Topic:[" + topic + "]");
if (subscriber.hasReceiveMore()) {
try {
Message m = (Message) Message.deserialize(subscriber.recv());
if (subscriber.hasReceiveMore()) {
try {
Message m = (Message) Message.deserialize(subscriber.recv());
+ _logger.debug("Announcement message [{}]", m);
+
// TODO: check on msg type or topic. Both
// should be same. Need to normalize.
if (Message.MessageType.ANNOUNCE == m.getType())
// TODO: check on msg type or topic. Both
// should be same. Need to normalize.
if (Message.MessageType.ANNOUNCE == m.getType())
*/
private void handleRpcCall() throws InterruptedException, ExecutionException {
try {
*/
private void handleRpcCall() throws InterruptedException, ExecutionException {
try {
- Message req = parseMessage(replySocket);
+ Message request = parseMessage(replySocket);
- System.out.println("Received RPC request [" + req + "]");
+ _logger.debug("Received rpc request [{}]", request);
// Call broker to process the message then reply
Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
// Call broker to process the message then reply
Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
- (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+ (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
RpcResult<CompositeNode> result = rpc.get();
Message response = new Message.MessageBuilder()
.type(MessageType.RESPONSE)
.sender(localIp + ":" + rpcPort)
RpcResult<CompositeNode> result = rpc.get();
Message response = new Message.MessageBuilder()
.type(MessageType.RESPONSE)
.sender(localIp + ":" + rpcPort)
+ .route(request.getRoute())
//.payload(result) TODO: enable and test
.build();
replySocket.send(Message.serialize(response));
//.payload(result) TODO: enable and test
.build();
replySocket.send(Message.serialize(response));
- System.out.println("Sent RPC response [" + response + "]");
+ _logger.debug("Sent rpc response [{}]", response);
} catch (IOException ex) {
//TODO: handle exception and send error codes to caller
} catch (IOException ex) {
//TODO: handle exception and send error codes to caller
- System.out.println("Rpc request could not be handled" + ex);
.payload(input.getPayload())
.build();
.payload(input.getPayload())
.build();
+ _logger.debug("Sending rpc request [{}]", requestMessage);
+
RpcReply<Object> reply = null;
try {
RpcReply<Object> reply = null;
try {
- System.out.println("\n\nRPC Request [" + requestMessage + "]");
requestSocket.send(Message.serialize(requestMessage));
requestSocket.send(Message.serialize(requestMessage));
- final Message resp = parseMessage(requestSocket);
+ final Message response = parseMessage(requestSocket);
- System.out.println("\n\nRPC Response [" + resp + "]");
+ _logger.debug("Received response [{}]", response);
reply = new RpcReply<Object>() {
@Override
public Object getPayload() {
reply = new RpcReply<Object>() {
@Override
public Object getPayload() {
- return resp.getPayload();
+ return response.getPayload();
}
};
} catch (IOException ex) {
// TODO: Pass exception back to the caller
}
};
} catch (IOException ex) {
// TODO: Pass exception back to the caller
- System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
Runnable task = new Runnable() {
public void run() {
Runnable task = new Runnable() {
public void run() {
- System.out.println(
- Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
-
- System.out.println(
- Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
-
publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
publisher.send(Message.serialize(notice));
publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
publisher.send(Message.serialize(notice));
+ _logger.debug("Announcement sent [{}]", notice);
} catch (IOException ex) {
} catch (IOException ex) {
- System.out.println("Error in publishing");
+ _logger.error("Error in sending announcement [{}]", notice);
- System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
- + "]");
-
}
};
handlersPool.execute(task);
}
};
handlersPool.execute(task);
// TODO: do registration for instance based routing
QName rpcType = route.getType();
RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
// TODO: do registration for instance based routing
QName rpcType = route.getType();
RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+ _logger.debug("Routing table updated");
Message msg = null;
try {
byte[] bytes = socket.recv();
Message msg = null;
try {
byte[] bytes = socket.recv();
- System.out.println("Received bytes:[" + bytes.length + "]");
+ _logger.debug("Received bytes:[{}]", bytes.length);
msg = (Message) Message.deserialize(bytes);
} catch (Throwable t) {
msg = (Message) Message.deserialize(bytes);
} catch (Throwable t) {
- System.out.println("Caught exception");
t.printStackTrace();
}
return msg;
t.printStackTrace();
}
return msg;
@Override
public void onRpcImplementationAdded(QName name) {
@Override
public void onRpcImplementationAdded(QName name) {
- System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
+ _logger.debug("Announcing registration for [{}]", name);
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(name);
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(name);