7e8590ab9b0d4aab155bc22569bf81c3556431f8
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / RpcSocket.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.zeromq.ZMQ;
15
16 import java.io.IOException;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 /**
22  * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
23  * It adds following capabilities:
24  * <li> Retry logic - Tries 3 times before giving up
25  * <li> Request times out after {@link TIMEOUT} property
26  * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
27  * the response for the 1st request is received. To overcome that, this socket queues all messages until
28  * the previous request has been responded.
29  */
30 public class RpcSocket {
31
32   // Constants
33   public static final int TIMEOUT = 2000;
34   public static final int QUEUE_SIZE = 10;
35   public static final int NUM_RETRIES = 3;
36   private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
37
38   private ZMQ.Socket socket;
39   private ZMQ.Poller poller;
40   private String address;
41   private SocketState state;
42   private long sendTime;
43   private int retriesLeft;
44   private LinkedBlockingQueue<MessageWrapper> inQueue;
45
46
47   public RpcSocket(String address, ZMQ.Poller poller) {
48     this.socket = null;
49     this.state = new IdleSocketState();
50     this.sendTime = -1;
51     this.retriesLeft = NUM_RETRIES;
52     this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
53     this.address = address;
54     this.poller = poller;
55     createSocket();
56   }
57
58   public ZMQ.Socket getSocket() {
59     return socket;
60   }
61
62   public String getAddress() {
63     return address;
64   }
65
66   public int getRetriesLeft() {
67     return retriesLeft;
68   }
69
70   public void setRetriesLeft(int retriesLeft) {
71     this.retriesLeft = retriesLeft;
72   }
73
74   public SocketState getState() {
75     return state;
76   }
77
78   public void setState(SocketState state) {
79     this.state = state;
80   }
81
82   public int getQueueSize() {
83     return inQueue.size();
84   }
85
86   public MessageWrapper removeCurrentRequest() {
87     return inQueue.poll();
88   }
89
90   public boolean hasTimedOut() {
91     return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
92   }
93
94   public void send(MessageWrapper request) throws TimeoutException {
95     try {
96       boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);    
97       if (!success) {
98         throw new TimeoutException("send :: Queue is full");
99       }
100       process();
101     }
102     catch (InterruptedException e) {
103       log.error("send : Thread interrupted while attempting to add request to inQueue", e);
104     }
105   }
106   
107   public MessageWrapper receive() {
108     Message response = parseMessage();
109     MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
110     MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
111
112     state = new IdleSocketState();
113     retriesLeft = NUM_RETRIES;
114     return responseMessageWrapper;
115   }
116   
117   public void process() {
118     if (getQueueSize() > 0) //process if there's message in the queue
119       state.process(this);
120   }
121
122   // Called by IdleSocketState & BusySocketState
123   public void sendMessage() {
124     //Get the message from queue without removing it. For retries
125     MessageWrapper messageWrapper = inQueue.peek();
126     if (messageWrapper != null) {
127       Message message = messageWrapper.getMessage();
128       try {
129         socket.send(Message.serialize(message));
130       }
131       catch (IOException e) {
132         log.debug("Message send failed [{}]", message);
133         log.debug("Exception [{}]", e);
134       }
135       sendTime = System.currentTimeMillis();
136     }
137   }
138   
139   public Message parseMessage() {
140     Message parsedMessage = null;
141     byte[] bytes = socket.recv();
142     log.debug("Received bytes:[{}]", bytes.length);
143     try {
144       parsedMessage = (Message)Message.deserialize(bytes);
145     }
146     catch (IOException|ClassNotFoundException e) {
147       log.debug("parseMessage : Deserializing received bytes failed", e);
148     }
149
150     return parsedMessage;
151   }
152
153   public void recycleSocket() {
154     close();
155   }
156
157   public void close() {
158     socket.setLinger(10);
159     socket.close();
160   }
161
162   private void createSocket() {
163     socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
164     socket.connect(address);
165     poller.register(socket, ZMQ.Poller.POLLIN);
166     state = new IdleSocketState();
167   }
168
169
170   /**
171    * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
172    */
173   public static interface SocketState {
174
175     /* The processing actions to be performed in this state
176      */
177     public void process(RpcSocket socket);
178   }
179
180   /**
181    * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
182    */
183   public static class IdleSocketState implements SocketState {
184
185     @Override
186     public void process(RpcSocket socket) {
187       socket.sendMessage();
188       socket.setState(new BusySocketState());
189       socket.setRetriesLeft(socket.getRetriesLeft()-1);
190     }
191   }
192
193   /**
194    * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
195    */
196   public static class BusySocketState implements SocketState {
197
198     private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
199
200     @Override
201     public void process(RpcSocket socket) {
202       if (socket.hasTimedOut()) {
203         if (socket.getRetriesLeft() > 0) {
204           log.debug("process : Request timed out, retrying now...");
205           socket.sendMessage();
206           socket.setRetriesLeft(socket.getRetriesLeft() - 1);
207         }
208         else {
209           // No more retries for current request, so stop processing the current request
210           MessageWrapper message = socket.removeCurrentRequest();
211           if (message != null) {
212             log.error("Unable to process rpc request [{}]", message);
213             socket.setState(new IdleSocketState());
214             socket.setRetriesLeft(NUM_RETRIES);
215           }
216         }
217       }
218       // Else no timeout, so allow processing to continue
219     }
220   }
221 }