Removing sysouts and adding log statments
[controller.git] / opendaylight / md-sal / sal-zeromq-connector / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / router / zeromq / ZeroMqRpcRouter.java
index 17b14800eee8f460e1b70384089a8e38b0545332..af9480432206455ba38a49bade85d59a10e0a772 100644 (file)
@@ -35,6 +35,8 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
 /**
@@ -71,6 +73,8 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
   private String pubIp = System.getProperty("pub.ip"); // other controller's ip
   private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
 
+  private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
+
   //Prevent instantiation
   private ZeroMqRpcRouter() {
   }
@@ -83,7 +87,6 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
     context = ZMQ.context(2);
     publisher = context.socket(ZMQ.PUB);
     int ret = publisher.bind("tcp://*:" + pubPort);
-    System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
     // serverPool = Executors.newSingleThreadExecutor();
     serverPool = Executors.newCachedThreadPool();
     handlersPool = Executors.newCachedThreadPool();
@@ -133,9 +136,9 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
 
           // Bind to publishing controller
           subscriber = context.socket(ZMQ.SUB);
-          subscriber.connect("tcp://" + pubIp + ":" + subPort);
-          System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
-              + pubIp + ":" + subPort + "]");
+          String pubAddress = "tcp://" + pubIp + ":" + subPort;
+          subscriber.connect(pubAddress);
+          _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
 
           //subscribe for announcements
           //TODO: Message type would be changed. Update this
@@ -145,7 +148,6 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
           ZMQ.Poller poller = new ZMQ.Poller(2);
           poller.register(replySocket, ZMQ.Poller.POLLIN);
           poller.register(subscriber, ZMQ.Poller.POLLIN);
-          System.out.println(Thread.currentThread().getName() + "-Start Polling");
 
           //TODO: Add code to restart the thread after exception
           while (!Thread.currentThread().isInterrupted()) {
@@ -175,14 +177,15 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
    * @throws ClassNotFoundException
    */
   private void handleAnnouncement() throws IOException, ClassNotFoundException {
-    System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
+
+    _logger.info("Announcement received");
     Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
-    System.out.println("Topic:[" + topic + "]");
 
     if (subscriber.hasReceiveMore()) {
       try {
         Message m = (Message) Message.deserialize(subscriber.recv());
-        System.out.println(m);
+        _logger.debug("Announcement message [{}]", m);
+
         // TODO: check on msg type or topic. Both
         // should be same. Need to normalize.
         if (Message.MessageType.ANNOUNCE == m.getType())
@@ -200,30 +203,30 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
    */
   private void handleRpcCall() throws InterruptedException, ExecutionException {
     try {
-      Message req = parseMessage(replySocket);
+      Message request = parseMessage(replySocket);
 
-      System.out.println("Received RPC request [" + req + "]");
+      _logger.debug("Received rpc request [{}]", request);
 
       // Call broker to process the message then reply
       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
-          (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+          (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
 
       RpcResult<CompositeNode> result = rpc.get();
 
       Message response = new Message.MessageBuilder()
           .type(MessageType.RESPONSE)
           .sender(localIp + ":" + rpcPort)
-          .route(req.getRoute())
+          .route(request.getRoute())
           //.payload(result)    TODO: enable and test
           .build();
 
       replySocket.send(Message.serialize(response));
 
-      System.out.println("Sent RPC response [" + response + "]");
+      _logger.debug("Sent rpc response [{}]", response);
 
     } catch (IOException ex) {
       //TODO: handle exception and send error codes to caller
-      System.out.println("Rpc request could not be handled" + ex);
+      ex.printStackTrace();
     }
   }
 
@@ -248,26 +251,27 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
             .payload(input.getPayload())
             .build();
 
+        _logger.debug("Sending rpc request [{}]", requestMessage);
+
         RpcReply<Object> reply = null;
 
         try {
-          System.out.println("\n\nRPC Request [" + requestMessage + "]");
 
           requestSocket.send(Message.serialize(requestMessage));
-          final Message resp = parseMessage(requestSocket);
+          final Message response = parseMessage(requestSocket);
 
-          System.out.println("\n\nRPC Response [" + resp + "]");
+          _logger.debug("Received response [{}]", response);
 
           reply = new RpcReply<Object>() {
 
             @Override
             public Object getPayload() {
-              return resp.getPayload();
+              return response.getPayload();
             }
           };
         } catch (IOException ex) {
           // TODO: Pass exception back to the caller
-          System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+          ex.printStackTrace();
         }
 
         return reply;
@@ -285,24 +289,15 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
     Runnable task = new Runnable() {
       public void run() {
 
-        System.out.println(
-            Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
-
         try {
 
-          System.out.println(
-              Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
-
           publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
           publisher.send(Message.serialize(notice));
-
+          _logger.debug("Announcement sent [{}]", notice);
         } catch (IOException ex) {
-          System.out.println("Error in publishing");
+          _logger.error("Error in sending announcement [{}]", notice);
           ex.printStackTrace();
         }
-        System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
-            + "]");
-
       }
     };
     handlersPool.execute(task);
@@ -352,6 +347,7 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
     // TODO: do registration for instance based routing
     QName rpcType = route.getType();
     RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+    _logger.debug("Routing table updated");
   }
 
   /**
@@ -363,10 +359,9 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
     Message msg = null;
     try {
       byte[] bytes = socket.recv();
-      System.out.println("Received bytes:[" + bytes.length + "]");
+      _logger.debug("Received bytes:[{}]", bytes.length);
       msg = (Message) Message.deserialize(bytes);
     } catch (Throwable t) {
-      System.out.println("Caught exception");
       t.printStackTrace();
     }
     return msg;
@@ -425,8 +420,8 @@ public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifi
 
     @Override
     public void onRpcImplementationAdded(QName name) {
-      System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
 
+      _logger.debug("Announcing registration for [{}]", name);
       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
       routeId.setType(name);