Implementation for enabling remote rpc calls between 2 instances of md-sal
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / SocketManager.java
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java
new file mode 100644 (file)
index 0000000..588a299
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
+ */
+public class SocketManager implements AutoCloseable{
+  private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
+
+  /*
+   * RpcSockets mapped by network address its connected to
+   */
+  private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
+
+  private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
+
+  /**
+   * Returns a {@link RpcSocket} for the given address
+   * @param address network address with port eg: 10.199.199.20:5554
+   * @return
+   */
+  public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
+    //Precondition
+    if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
+      throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
+    }
+
+    if (!managedSockets.containsKey(address)) {
+      log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
+      RpcSocket socket = new RpcSocket(address, _poller);
+      managedSockets.put(address, socket);
+    }
+
+    return managedSockets.get(address);
+  }
+
+  /**
+   * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
+   * @param socket
+   * @return
+   */
+  public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
+    for (RpcSocket rpcSocket : managedSockets.values()) {
+      if (rpcSocket.getSocket().equals(socket)) {
+        return Optional.of(rpcSocket);
+      }
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * Return a collection of all managed sockets
+   * @return
+   */
+  public Collection<RpcSocket> getManagedSockets() {
+    return managedSockets.values();
+  }
+
+  /**
+   * Returns the {@link ZMQ.Poller}
+   * @return
+   */
+  public ZMQ.Poller getPoller() {
+    return _poller;
+  }
+
+  /**
+   * This should be called when stopping the server to close all the sockets
+   * @return
+   */
+  @Override
+  public void close() throws Exception {
+    log.debug("Stopping...");
+    for (RpcSocket socket : managedSockets.values()) {
+      socket.close();
+    }
+    managedSockets.clear();
+    log.debug("Stopped");
+  }
+}