Merge "Fixed deserialization of IdentityRefs in Restconf URI."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / Sender.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.zeromq.ZMQ;
17
18 import java.io.IOException;
19 import java.util.concurrent.TimeoutException;
20
21 import static com.google.common.base.Preconditions.*;
22
23 /**
24  * Main server thread for sending requests.
25  */
26 public class Sender implements Runnable{
27
28   private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
29   private final Client client;
30
31
32   
33   
34   public Sender(Client client) {
35     super();
36     this.client = client;
37   }
38
39 @Override
40   public void run() {
41     _logger.info("Starting...");
42
43     try (SocketManager socketManager = new SocketManager()){
44       while (!Thread.currentThread().isInterrupted()) {
45
46         //read incoming messages from blocking queue
47         MessageWrapper request = pollForRequest();
48
49         if (request != null) {
50           processRequest(socketManager, request);
51         }
52
53         flushSockets(socketManager);
54         pollForResponse(socketManager);
55         processResponse(socketManager);
56
57       }
58     } catch(Exception t){
59       _logger.error("Exception: [{}]", t);
60       _logger.error("Stopping...");
61     }
62   }
63
64   private void processResponse(SocketManager socketManager) {
65     for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
66       // If any sockets get a response, process it
67       if (socketManager.getPoller().pollin(i)) {
68         Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
69             socketManager.getPoller().getItem(i).getSocket());
70
71         checkState(socket.isPresent(), "Managed socket not found");
72
73         MessageWrapper response = socket.get().receive();
74         _logger.debug("Received rpc response [{}]", response.getMessage());
75
76         //TODO: handle exception and introduce timeout on receiver side
77         try {
78           response.getReceiveSocket().send(Message.serialize(response.getMessage()));
79         } catch (IOException e) {
80           e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
81         }
82       }
83     }
84   }
85
86   private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
87
88     if ((request.getMessage() == null) ||
89         (request.getMessage().getRecipient() == null)) {
90       //invalid message. log and drop
91       _logger.error("Invalid request [{}]", request);
92       return;
93     }
94
95     RpcSocket socket =
96         socketManager.getManagedSocket(request.getMessage().getRecipient());
97
98     socket.send(request);
99   }
100
101   private void flushSockets(SocketManager socketManager){
102     for (RpcSocket socket : socketManager.getManagedSockets()){
103       socket.process();
104     }
105   }
106
107   private MessageWrapper pollForRequest(){
108     return client.getRequestQueue().poll();
109   }
110
111   private void pollForResponse(SocketManager socketManager){
112     try{
113       socketManager.getPoller().poll(10); //poll every 10ms
114     }catch (Throwable t) { /*Ignore and continue*/ }
115   }
116 }
117
118
119 /*
120 SCALA
121
122 package org.opendaylight.controller.sal.connector.remoterpc
123
124   import org.slf4j.{LoggerFactory, Logger}
125   import scala.collection.JavaConverters._
126   import scala.Some
127   import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
128 */
129 /**
130  * Main server thread for sending requests. This does not maintain any state. If the
131  * thread dies, it will be restarted
132  */
133 /*class Sender extends Runnable {
134   private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
135
136   override def run = {
137     _logger.info("Sender starting...")
138     val socketManager = new SocketManager()
139
140     try {
141       while (!Thread.currentThread().isInterrupted) {
142         //read incoming messages from blocking queue
143         val request: MessageWrapper = Client.requestQueue.poll()
144
145         if (request != null) {
146           if ((request.message != null) &&
147             (request.message.getRecipient != null)) {
148
149             val socket = socketManager.getManagedSocket(request.message.getRecipient)
150             socket.send(request)
151           } else {
152             //invalid message. log and drop
153             _logger.error("Invalid request [{}]", request)
154           }
155         }
156
157         socketManager.getManagedSockets().asScala.map(s => s.process)
158
159         // Poll all sockets for responses every 1 sec
160         poll(socketManager)
161
162         // If any sockets get a response, process it
163         for (i <- 0 until socketManager.poller.getSize) {
164           if (socketManager.poller.pollin(i)) {
165             val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
166
167             socket match {
168               case None => //{
169                 _logger.error("Could not find a managed socket for zmq socket")
170                 throw new IllegalStateException("Could not find a managed socket for zmq socket")
171                 //}
172               case Some(s) => {
173                 val response = s.receive()
174                 _logger.debug("Received rpc response [{}]", response.message)
175                 response.receiveSocket.send(Message.serialize(response.message))
176               }
177             }
178           }
179         }
180
181       }
182     } catch{
183       case e:Exception => {
184         _logger.debug("Sender stopping due to exception")
185         e.printStackTrace()
186       }
187     } finally {
188       socketManager.stop
189     }
190   }
191
192   def poll(socketManager:SocketManager) = {
193     try{
194       socketManager.poller.poll(10)
195     }catch{
196       case t:Throwable => //ignore and continue
197     }
198   }
199 }
200
201
202 //    def newThread(r: Runnable): Thread = {
203 //      val t = new RequestHandler()
204 //      t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
205 //      t
206 //    }
207
208
209
210 /**
211  * Restarts the request processing server in the event of unforeseen exceptions
212  */
213 //private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
214 //  def uncaughtException(t: Thread, e: Throwable) = {
215 //    _logger.error("Exception caught during request processing [{}]", e)
216 //    _logger.info("Restarting request processor server...")
217 //    RequestProcessor.start()
218 //  }