/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.connector.remoterpc; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * */ public class ServerRequestHandler implements AutoCloseable{ private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class); private final String DEFAULT_NAME = "remote-rpc-worker"; private String dealerAddress; private String serverAddress; private int workerCount; private ZMQ.Context context; private Broker.ProviderSession broker; private RequestHandlerThreadPool workerPool; private final AtomicInteger threadId = new AtomicInteger(); public ServerRequestHandler(ZMQ.Context context, Broker.ProviderSession session, int workerCount, String dealerAddress, String serverAddress) { this.context = context; this.dealerAddress = dealerAddress; this.serverAddress = serverAddress; this.broker = session; this.workerCount = workerCount; } public ThreadPoolExecutor getWorkerPool(){ return workerPool; } public void start(){ workerPool = new RequestHandlerThreadPool( workerCount, workerCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); //unbound is ok. Task will never be submitted for (int i=0;i> rpc = null; RpcResult result = null; //TODO Call this in a new thread with timeout try { rpc = broker.rpc( (QName) request.getRoute().getType(), XmlUtils.xmlToCompositeNode((String) request.getPayload())); result = (rpc != null) ? rpc.get() : null; } catch (Exception e) { _logger.debug("Broker threw [{}]", e); } CompositeNode payload = (result != null) ? result.getResult() : null; Message response = new Message.MessageBuilder() .type(Message.MessageType.RESPONSE) .sender(serverAddress) .route(request.getRoute()) .payload(XmlUtils.compositeNodeToXml(payload)) .build(); _logger.debug("Sending rpc response [{}]", response); try { socket.send(Message.serialize(response)); } catch (Exception e) { _logger.debug("rpc response send failed for message [{}]", response); _logger.debug("{}", e); } } } } catch (Exception e) { printException(e); } finally { closeSocket(socket); } } /** * @param socket * @return */ private Message parseMessage(ZMQ.Socket socket) throws Exception { byte[] bytes = socket.recv(); //this blocks _logger.debug("Received bytes:[{}]", bytes.length); return (Message) Message.deserialize(bytes); } private void printException(Exception e) { try (StringWriter s = new StringWriter(); PrintWriter p = new PrintWriter(s)) { e.printStackTrace(p); _logger.debug(s.toString()); } catch (IOException e1) {/*Ignore and continue*/ } } private void closeSocket(ZMQ.Socket socket) { try { if (socket != null) socket.close(); } catch (Exception x) { _logger.debug("Exception while closing socket [{}]", x); } finally { if (socket != null) socket.close(); } _logger.debug("Closing..."); } } /** * */ public class RequestHandlerThreadPool extends ThreadPoolExecutor{ public RequestHandlerThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void afterExecute(Runnable r, Throwable t) { if (isTerminating() || isTerminated() || isShutdown()) return; if ( t != null ){ _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage()); } this.execute(new Worker(threadId.incrementAndGet())); super.afterExecute(r, null); } } }