/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.connector.remoterpc import org.opendaylight.yangtools.yang.data.api.CompositeNode import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName} import org.opendaylight.controller.sal.core.api.RpcImplementation import java.util import java.util.{UUID, Collections} import org.zeromq.ZMQ import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs} import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message} import Message.MessageType import java.util.concurrent._ import java.lang.InterruptedException /** * An implementation of {@link RpcImplementation} that makes * remote RPC calls */ class Client extends RemoteRpcClient { private val _logger = LoggerFactory.getLogger(this.getClass); val requestQueue = new LinkedBlockingQueue[MessageWrapper](100) val pool: ExecutorService = Executors.newSingleThreadExecutor() private val TIMEOUT = 5000 //in ms var routingTableProvider: RoutingTableProvider = null def getInstance = this def setRoutingTableProvider(provider : RoutingTableProvider) = { routingTableProvider = provider; } def getSupportedRpcs: util.Set[QName] = { Collections.emptySet() } def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = { val routeId = new RouteIdentifierImpl() routeId.setType(rpc) //lookup address for the rpc request val routingTable = routingTableProvider.getRoutingTable() require( routingTable != null, "Routing table not found. Exiting" ) val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString) require(addresses != null, "Address not found for rpc " + rpc); require(addresses.size() == 1) //its a global service. val address = addresses.iterator().next() require(address != null, "Address is null") //create in-process "pair" socket and pass it to sender thread //Sender replies on this when result is available val inProcAddress = "inproc://" + UUID.randomUUID() val receiver = Context.zmqContext.socket(ZMQ.PAIR) receiver.bind(inProcAddress); val sender = Context.zmqContext.socket(ZMQ.PAIR) sender.connect(inProcAddress) val requestMessage = new Message.MessageBuilder() .`type`(MessageType.REQUEST) //.sender("tcp://localhost:8081") .recipient(address) .route(routeId) .payload(input) .build() _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress) val messageWrapper = new MessageWrapper(requestMessage, sender) val errors = new util.ArrayList[RpcError] try { process(messageWrapper) val response = parseMessage(receiver) return Rpcs.getRpcResult( true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet()) } catch { case e: Exception => { errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause)) return Rpcs.getRpcResult(false, null, errors) } } finally { receiver.close(); sender.close(); } } /** * Block on socket for reply * @param receiver * @return */ private def parseMessage(receiver:ZMQ.Socket): Message = { val bytes = receiver.recv() return Message.deserialize(bytes).asInstanceOf[Message] } def start() = { pool.execute(new Sender) } def process(msg: MessageWrapper) = { _logger.debug("Processing message [{}]", msg) val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS) if (!success) throw new TimeoutException("Queue is full"); } def stop() = { pool.shutdown() //intiate shutdown _logger.debug("Client stopping...") // Context.zmqContext.term(); // _logger.debug("ZMQ context terminated") try { if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { pool.shutdownNow(); if (!pool.awaitTermination(10, TimeUnit.SECONDS)) _logger.error("Client thread pool did not shut down"); } } catch { case ie:InterruptedException => // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } _logger.debug("Client stopped") } def close() = { stop(); } }