Merge "Fix for Bugs 324"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / scala / org / opendaylight / controller / sal / connector / remoterpc / Client.scala
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc
9
10 import org.opendaylight.yangtools.yang.data.api.CompositeNode
11 import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
12 import org.opendaylight.controller.sal.core.api.RpcImplementation
13 import java.util
14 import java.util.{UUID, Collections}
15 import org.zeromq.ZMQ
16 import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
17 import org.slf4j.LoggerFactory
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
19 import Message.MessageType
20 import java.util.concurrent._
21 import java.lang.InterruptedException
22
23
24 /**
25  * An implementation of {@link RpcImplementation} that makes
26  * remote RPC calls
27  */
28 class Client extends RemoteRpcClient {
29
30   private val _logger = LoggerFactory.getLogger(this.getClass);
31
32   val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
33   val pool: ExecutorService = Executors.newSingleThreadExecutor()
34   private val TIMEOUT = 5000 //in ms
35   var routingTableProvider: RoutingTableProvider = null
36   
37   
38   def getInstance = this
39
40   
41   def setRoutingTableProvider(provider : RoutingTableProvider) = {
42     routingTableProvider = provider;
43   }
44   
45   def getSupportedRpcs: util.Set[QName] = {
46     Collections.emptySet()
47   }
48
49   def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
50
51     val routeId = new RouteIdentifierImpl()
52     routeId.setType(rpc)
53
54     //lookup address for the rpc request
55     val routingTable = routingTableProvider.getRoutingTable()
56     require( routingTable != null, "Routing table not found. Exiting" )
57
58     val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
59     require(addresses != null, "Address not found for rpc " + rpc);
60     require(addresses.size() == 1) //its a global service.
61
62     val address = addresses.iterator().next()
63     require(address != null, "Address is null")
64
65     //create in-process "pair" socket and pass it to sender thread
66     //Sender replies on this when result is available
67     val inProcAddress = "inproc://" + UUID.randomUUID()
68     val receiver = Context.zmqContext.socket(ZMQ.PAIR)
69     receiver.bind(inProcAddress);
70
71     val sender = Context.zmqContext.socket(ZMQ.PAIR)
72     sender.connect(inProcAddress)
73
74     val requestMessage = new Message.MessageBuilder()
75       .`type`(MessageType.REQUEST)
76       //.sender("tcp://localhost:8081")
77       .recipient(address)
78       .route(routeId)
79       .payload(input)
80       .build()
81
82     _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
83
84     val messageWrapper = new MessageWrapper(requestMessage, sender)
85     val errors = new util.ArrayList[RpcError]
86
87     try {
88       process(messageWrapper)
89       val response = parseMessage(receiver)
90
91       return Rpcs.getRpcResult(
92         true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
93
94     } catch {
95       case e: Exception => {
96         errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
97         return Rpcs.getRpcResult(false, null, errors)
98       }
99     } finally {
100       receiver.close();
101       sender.close();
102     }
103
104   }
105
106   /**
107    * Block on socket for reply
108    * @param receiver
109    * @return
110    */
111   private def parseMessage(receiver:ZMQ.Socket): Message = {
112     val bytes = receiver.recv()
113     return  Message.deserialize(bytes).asInstanceOf[Message]
114   }
115
116   def start() = {
117     pool.execute(new Sender)
118   }
119
120   def process(msg: MessageWrapper) = {
121     _logger.debug("Processing message [{}]", msg)
122     val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
123
124     if (!success) throw new TimeoutException("Queue is full");
125
126   }
127
128   def stop() = {
129     pool.shutdown() //intiate shutdown
130     _logger.debug("Client stopping...")
131     //    Context.zmqContext.term();
132     //    _logger.debug("ZMQ context terminated")
133
134     try {
135
136       if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
137         pool.shutdownNow();
138         if (!pool.awaitTermination(10, TimeUnit.SECONDS))
139           _logger.error("Client thread pool did not shut down");
140       }
141     } catch {
142       case ie:InterruptedException =>
143         // (Re-)Cancel if current thread also interrupted
144         pool.shutdownNow();
145         // Preserve interrupt status
146         Thread.currentThread().interrupt();
147     }
148     _logger.debug("Client stopped")
149   }
150   
151   def close() = {
152     stop();
153   }
154 }