2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.zeromq.ZMQ;
16 import java.io.IOException;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
22 * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
23 * It adds following capabilities:
24 * <li> Retry logic - Tries 3 times before giving up
25 * <li> Request times out after {@link TIMEOUT} property
26 * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
27 * the response for the 1st request is received. To overcome that, this socket queues all messages until
28 * the previous request has been responded.
30 public class RpcSocket {
33 public static final int TIMEOUT = 2000;
34 public static final int QUEUE_SIZE = 10;
35 public static final int NUM_RETRIES = 3;
36 private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
38 private ZMQ.Socket socket;
39 private ZMQ.Poller poller;
40 private String address;
41 private SocketState state;
42 private long sendTime;
43 private int retriesLeft;
44 private LinkedBlockingQueue<MessageWrapper> inQueue;
47 public RpcSocket(String address, ZMQ.Poller poller) {
49 this.state = new IdleSocketState();
51 this.retriesLeft = NUM_RETRIES;
52 this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
53 this.address = address;
58 public ZMQ.Socket getSocket() {
62 public String getAddress() {
66 public int getRetriesLeft() {
70 public void setRetriesLeft(int retriesLeft) {
71 this.retriesLeft = retriesLeft;
74 public SocketState getState() {
78 public void setState(SocketState state) {
82 public int getQueueSize() {
83 return inQueue.size();
86 public MessageWrapper removeCurrentRequest() {
87 return inQueue.poll();
90 public boolean hasTimedOut() {
91 return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
94 public void send(MessageWrapper request) throws TimeoutException {
96 boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);
98 throw new TimeoutException("send :: Queue is full");
102 catch (InterruptedException e) {
103 log.error("send : Thread interrupted while attempting to add request to inQueue", e);
107 public MessageWrapper receive() {
108 Message response = parseMessage();
109 MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
110 MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
112 state = new IdleSocketState();
113 retriesLeft = NUM_RETRIES;
114 return responseMessageWrapper;
117 public void process() {
118 if (getQueueSize() > 0) //process if there's message in the queue
122 // Called by IdleSocketState & BusySocketState
123 public void sendMessage() {
124 //Get the message from queue without removing it. For retries
125 MessageWrapper messageWrapper = inQueue.peek();
126 if (messageWrapper != null) {
127 Message message = messageWrapper.getMessage();
129 socket.send(Message.serialize(message));
131 catch (IOException e) {
132 log.debug("Message send failed [{}]", message);
133 log.debug("Exception [{}]", e);
135 sendTime = System.currentTimeMillis();
139 public Message parseMessage() {
140 Message parsedMessage = null;
141 byte[] bytes = socket.recv();
142 log.debug("Received bytes:[{}]", bytes.length);
144 parsedMessage = (Message)Message.deserialize(bytes);
146 catch (IOException|ClassNotFoundException e) {
147 log.debug("parseMessage : Deserializing received bytes failed", e);
150 return parsedMessage;
153 public void recycleSocket() {
157 public void close() {
158 socket.setLinger(10);
162 private void createSocket() {
163 socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
164 socket.connect(address);
165 poller.register(socket, ZMQ.Poller.POLLIN);
166 state = new IdleSocketState();
171 * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
173 public static interface SocketState {
175 /* The processing actions to be performed in this state
177 public void process(RpcSocket socket);
181 * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
183 public static class IdleSocketState implements SocketState {
186 public void process(RpcSocket socket) {
187 socket.sendMessage();
188 socket.setState(new BusySocketState());
189 socket.setRetriesLeft(socket.getRetriesLeft()-1);
194 * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
196 public static class BusySocketState implements SocketState {
198 private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
201 public void process(RpcSocket socket) {
202 if (socket.hasTimedOut()) {
203 if (socket.getRetriesLeft() > 0) {
204 log.debug("process : Request timed out, retrying now...");
205 socket.sendMessage();
206 socket.setRetriesLeft(socket.getRetriesLeft() - 1);
209 // No more retries for current request, so stop processing the current request
210 MessageWrapper message = socket.removeCurrentRequest();
211 if (message != null) {
212 log.error("Unable to process rpc request [{}]", message);
213 socket.setState(new IdleSocketState());
214 socket.setRetriesLeft(NUM_RETRIES);
218 // Else no timeout, so allow processing to continue