2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc
10 import org.opendaylight.yangtools.yang.data.api.CompositeNode
11 import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
12 import org.opendaylight.controller.sal.core.api.RpcImplementation
14 import java.util.{UUID, Collections}
16 import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
17 import org.slf4j.LoggerFactory
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
19 import Message.MessageType
20 import java.util.concurrent._
21 import java.lang.InterruptedException
25 * An implementation of {@link RpcImplementation} that makes
28 class Client extends RemoteRpcClient {
30 private val _logger = LoggerFactory.getLogger(this.getClass);
32 val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
33 val pool: ExecutorService = Executors.newSingleThreadExecutor()
34 private val TIMEOUT = 5000 //in ms
35 var routingTableProvider: RoutingTableProvider = null
38 def getInstance = this
41 def setRoutingTableProvider(provider : RoutingTableProvider) = {
42 routingTableProvider = provider;
45 def getSupportedRpcs: util.Set[QName] = {
46 Collections.emptySet()
49 def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
51 val routeId = new RouteIdentifierImpl()
54 //lookup address for the rpc request
55 val routingTable = routingTableProvider.getRoutingTable()
56 require( routingTable != null, "Routing table not found. Exiting" )
58 val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
59 require(addresses != null, "Address not found for rpc " + rpc);
60 require(addresses.size() == 1) //its a global service.
62 val address = addresses.iterator().next()
63 require(address != null, "Address is null")
65 //create in-process "pair" socket and pass it to sender thread
66 //Sender replies on this when result is available
67 val inProcAddress = "inproc://" + UUID.randomUUID()
68 val receiver = Context.zmqContext.socket(ZMQ.PAIR)
69 receiver.bind(inProcAddress);
71 val sender = Context.zmqContext.socket(ZMQ.PAIR)
72 sender.connect(inProcAddress)
74 val requestMessage = new Message.MessageBuilder()
75 .`type`(MessageType.REQUEST)
76 //.sender("tcp://localhost:8081")
82 _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
84 val messageWrapper = new MessageWrapper(requestMessage, sender)
85 val errors = new util.ArrayList[RpcError]
88 process(messageWrapper)
89 val response = parseMessage(receiver)
91 return Rpcs.getRpcResult(
92 true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
95 case e: Exception => {
96 errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
97 return Rpcs.getRpcResult(false, null, errors)
107 * Block on socket for reply
111 private def parseMessage(receiver:ZMQ.Socket): Message = {
112 val bytes = receiver.recv()
113 return Message.deserialize(bytes).asInstanceOf[Message]
117 pool.execute(new Sender)
120 def process(msg: MessageWrapper) = {
121 _logger.debug("Processing message [{}]", msg)
122 val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
124 if (!success) throw new TimeoutException("Queue is full");
129 pool.shutdown() //intiate shutdown
130 _logger.debug("Client stopping...")
131 // Context.zmqContext.term();
132 // _logger.debug("ZMQ context terminated")
136 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
138 if (!pool.awaitTermination(10, TimeUnit.SECONDS))
139 _logger.error("Client thread pool did not shut down");
142 case ie:InterruptedException =>
143 // (Re-)Cancel if current thread also interrupted
145 // Preserve interrupt status
146 Thread.currentThread().interrupt();
148 _logger.debug("Client stopped")