/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.sal.connector.remoterpc;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
* It adds following capabilities:
*
Retry logic - Tries 3 times before giving up
* Request times out after {@link TIMEOUT} property
* The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
* the response for the 1st request is received. To overcome that, this socket queues all messages until
* the previous request has been responded.
*/
public class RpcSocket {
// Constants
public static final int TIMEOUT = 2000;
public static final int QUEUE_SIZE = 10;
public static final int NUM_RETRIES = 3;
private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
private ZMQ.Socket socket;
private ZMQ.Poller poller;
private String address;
private SocketState state;
private long sendTime;
private int retriesLeft;
private LinkedBlockingQueue inQueue;
public RpcSocket(String address, ZMQ.Poller poller) {
this.socket = null;
this.state = new IdleSocketState();
this.sendTime = -1;
this.retriesLeft = NUM_RETRIES;
this.inQueue = new LinkedBlockingQueue(QUEUE_SIZE);
this.address = address;
this.poller = poller;
createSocket();
}
public ZMQ.Socket getSocket() {
return socket;
}
public String getAddress() {
return address;
}
public int getRetriesLeft() {
return retriesLeft;
}
public void setRetriesLeft(int retriesLeft) {
this.retriesLeft = retriesLeft;
}
public SocketState getState() {
return state;
}
public void setState(SocketState state) {
this.state = state;
}
public int getQueueSize() {
return inQueue.size();
}
public MessageWrapper removeCurrentRequest() {
return inQueue.poll();
}
public boolean hasTimedOut() {
return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
}
public void send(MessageWrapper request) throws TimeoutException {
try {
boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);
if (!success) {
throw new TimeoutException("send :: Queue is full");
}
process();
}
catch (InterruptedException e) {
log.error("send : Thread interrupted while attempting to add request to inQueue", e);
}
}
public MessageWrapper receive() {
Message response = parseMessage();
MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
state = new IdleSocketState();
retriesLeft = NUM_RETRIES;
return responseMessageWrapper;
}
public void process() {
if (getQueueSize() > 0) //process if there's message in the queue
state.process(this);
}
// Called by IdleSocketState & BusySocketState
public void sendMessage() {
//Get the message from queue without removing it. For retries
MessageWrapper messageWrapper = inQueue.peek();
if (messageWrapper != null) {
Message message = messageWrapper.getMessage();
try {
socket.send(Message.serialize(message));
}
catch (IOException e) {
log.debug("Message send failed [{}]", message);
log.debug("Exception [{}]", e);
}
sendTime = System.currentTimeMillis();
}
}
public Message parseMessage() {
Message parsedMessage = null;
byte[] bytes = socket.recv();
log.debug("Received bytes:[{}]", bytes.length);
try {
parsedMessage = (Message)Message.deserialize(bytes);
}
catch (IOException|ClassNotFoundException e) {
log.debug("parseMessage : Deserializing received bytes failed", e);
}
return parsedMessage;
}
public void recycleSocket() {
close();
}
public void close() {
socket.setLinger(10);
socket.close();
}
private void createSocket() {
socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
socket.connect(address);
poller.register(socket, ZMQ.Poller.POLLIN);
state = new IdleSocketState();
}
/**
* Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
*/
public static interface SocketState {
/* The processing actions to be performed in this state
*/
public void process(RpcSocket socket);
}
/**
* Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
*/
public static class IdleSocketState implements SocketState {
@Override
public void process(RpcSocket socket) {
socket.sendMessage();
socket.setState(new BusySocketState());
socket.setRetriesLeft(socket.getRetriesLeft()-1);
}
}
/**
* Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
*/
public static class BusySocketState implements SocketState {
private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
@Override
public void process(RpcSocket socket) {
if (socket.hasTimedOut()) {
if (socket.getRetriesLeft() > 0) {
log.debug("process : Request timed out, retrying now...");
socket.sendMessage();
socket.setRetriesLeft(socket.getRetriesLeft() - 1);
}
else {
// No more retries for current request, so stop processing the current request
MessageWrapper message = socket.removeCurrentRequest();
if (message != null) {
log.error("Unable to process rpc request [{}]", message);
socket.setState(new IdleSocketState());
socket.setRetriesLeft(NUM_RETRIES);
}
}
}
// Else no timeout, so allow processing to continue
}
}
}