+
+ class MessageHandler{
+ private ZMQ.Socket socket;
+ private Message message; //parsed message received on zmq server port
+ private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+ public MessageHandler(ZMQ.Socket socket){
+ this.socket = socket;
+ }
+
+ void receiveMessage(){
+ byte[] bytes = socket.recv(); //this blocks
+ _logger.debug("Received bytes:[{}]", bytes.length);
+
+ Object objectRecvd = null;
+ try{
+ objectRecvd = Message.deserialize(bytes);
+ }catch (Exception e){
+ sendError(e.getMessage());
+ return;
+ }
+
+ if (!(objectRecvd instanceof Message)) {
+ sendError("Invalid message received");
+ return;
+ }
+
+ message = (Message) objectRecvd;
+
+ _logger.info("Received request [{}]", message);
+
+ if (Message.MessageType.PING == message.getType()){
+ sendPong();
+ return;
+ }
+
+ messageForBroker = true;
+ }
+
+ boolean hasMessageForBroker(){
+ return messageForBroker;
+ }
+
+ Message getMessage(){
+ return message;
+ }
+
+ void sendResponse(RpcResult<CompositeNode> result){
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ String recipient = null;
+ RpcRouter.RouteIdentifier routeId = null;
+
+ if (message != null) {
+ recipient = message.getSender();
+ routeId = message.getRoute();
+ }
+
+ Message response = new Message.MessageBuilder()
+ .type(Message.MessageType.RESPONSE)
+ .sender(serverAddress)
+ .recipient(recipient)
+ .route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(payload))
+ .build();
+
+ send(response);
+ }
+
+ private void sendError(String msg){
+ Message errorResponse = new Message.MessageBuilder()
+ .type(Message.MessageType.ERROR)
+ .sender(serverAddress)
+ .payload(msg)
+ .build();
+
+ send(errorResponse);
+ }
+
+ private void sendPong(){
+ Message pong = new Message.MessageBuilder()
+ .type(Message.MessageType.PONG)
+ .sender(serverAddress)
+ .build();
+
+ send(pong);
+ }
+
+ private void send(Message msg){
+ byte[] serializedMessage = null;
+ try {
+ serializedMessage = Message.serialize(msg);
+ } catch (Exception e) {
+ _logger.debug("Unexpected error during serialization of response [{}]", msg);
+ return;
+ }
+
+ if (serializedMessage != null)
+ if (socket.send(serializedMessage))
+ _logger.info("Response sent [{}]", msg);
+ else _logger.debug("Failed to send serialized message");
+ }
+ }