+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import java.util
-import java.util.{UUID, Collections}
-import org.zeromq.ZMQ
-import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
-import org.slf4j.LoggerFactory
-import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
-import Message.MessageType
-import java.util.concurrent._
-import java.lang.InterruptedException
-
-
-/**
- * An implementation of {@link RpcImplementation} that makes
- * remote RPC calls
- */
-class Client extends RemoteRpcClient {
-
- private val _logger = LoggerFactory.getLogger(this.getClass);
-
- val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
- val pool: ExecutorService = Executors.newSingleThreadExecutor()
- private val TIMEOUT = 5000 //in ms
- var routingTableProvider: RoutingTableProvider = null
-
-
- def getInstance = this
-
-
- def setRoutingTableProvider(provider : RoutingTableProvider) = {
- routingTableProvider = provider;
- }
-
- def getSupportedRpcs: util.Set[QName] = {
- Collections.emptySet()
- }
-
- def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
-
- val routeId = new RouteIdentifierImpl()
- routeId.setType(rpc)
-
- //lookup address for the rpc request
- val routingTable = routingTableProvider.getRoutingTable()
- require( routingTable != null, "Routing table not found. Exiting" )
-
- val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
- require(addresses != null, "Address not found for rpc " + rpc);
- require(addresses.size() == 1) //its a global service.
-
- val address = addresses.iterator().next()
- require(address != null, "Address is null")
-
- //create in-process "pair" socket and pass it to sender thread
- //Sender replies on this when result is available
- val inProcAddress = "inproc://" + UUID.randomUUID()
- val receiver = Context.zmqContext.socket(ZMQ.PAIR)
- receiver.bind(inProcAddress);
-
- val sender = Context.zmqContext.socket(ZMQ.PAIR)
- sender.connect(inProcAddress)
-
- val requestMessage = new Message.MessageBuilder()
- .`type`(MessageType.REQUEST)
- //.sender("tcp://localhost:8081")
- .recipient(address)
- .route(routeId)
- .payload(input)
- .build()
-
- _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
-
- val messageWrapper = new MessageWrapper(requestMessage, sender)
- val errors = new util.ArrayList[RpcError]
-
- try {
- process(messageWrapper)
- val response = parseMessage(receiver)
-
- return Rpcs.getRpcResult(
- true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
-
- } catch {
- case e: Exception => {
- errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
- return Rpcs.getRpcResult(false, null, errors)
- }
- } finally {
- receiver.close();
- sender.close();
- }
-
- }
-
- /**
- * Block on socket for reply
- * @param receiver
- * @return
- */
- private def parseMessage(receiver:ZMQ.Socket): Message = {
- val bytes = receiver.recv()
- return Message.deserialize(bytes).asInstanceOf[Message]
- }
-
- def start() = {
- pool.execute(new Sender)
- }
-
- def process(msg: MessageWrapper) = {
- _logger.debug("Processing message [{}]", msg)
- val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
-
- if (!success) throw new TimeoutException("Queue is full");
-
- }
-
- def stop() = {
- pool.shutdown() //intiate shutdown
- _logger.debug("Client stopping...")
- // Context.zmqContext.term();
- // _logger.debug("ZMQ context terminated")
-
- try {
-
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- _logger.error("Client thread pool did not shut down");
- }
- } catch {
- case ie:InterruptedException =>
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- _logger.debug("Client stopped")
- }
-
- def close() = {
- stop();
- }
-}