Implementation for enabling remote rpc calls between 2 instances of md-sal
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / SocketManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.zeromq.ZMQ;
14 import java.net.UnknownHostException;
15 import java.util.Collection;
16 import java.util.concurrent.ConcurrentHashMap;
17
18
19 /**
20  * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
21  */
22 public class SocketManager implements AutoCloseable{
23   private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
24
25   /*
26    * RpcSockets mapped by network address its connected to
27    */
28   private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
29
30   private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
31
32   /**
33    * Returns a {@link RpcSocket} for the given address
34    * @param address network address with port eg: 10.199.199.20:5554
35    * @return
36    */
37   public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
38     //Precondition
39     if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
40       throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
41     }
42
43     if (!managedSockets.containsKey(address)) {
44       log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
45       RpcSocket socket = new RpcSocket(address, _poller);
46       managedSockets.put(address, socket);
47     }
48
49     return managedSockets.get(address);
50   }
51
52   /**
53    * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
54    * @param socket
55    * @return
56    */
57   public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
58     for (RpcSocket rpcSocket : managedSockets.values()) {
59       if (rpcSocket.getSocket().equals(socket)) {
60         return Optional.of(rpcSocket);
61       }
62     }
63     return Optional.absent();
64   }
65
66   /**
67    * Return a collection of all managed sockets
68    * @return
69    */
70   public Collection<RpcSocket> getManagedSockets() {
71     return managedSockets.values();
72   }
73
74   /**
75    * Returns the {@link ZMQ.Poller}
76    * @return
77    */
78   public ZMQ.Poller getPoller() {
79     return _poller;
80   }
81
82   /**
83    * This should be called when stopping the server to close all the sockets
84    * @return
85    */
86   @Override
87   public void close() throws Exception {
88     log.debug("Stopping...");
89     for (RpcSocket socket : managedSockets.values()) {
90       socket.close();
91     }
92     managedSockets.clear();
93     log.debug("Stopped");
94   }
95 }