Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcListener.java
index f6149906692ad0e5133938068508e80364d0769e..20f32cb0da4a4a26a625c1ae901d90ea34016ca3 100644 (file)
@@ -5,55 +5,54 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
+import java.util.Collection;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcListener implements RpcRegistrationListener{
-
-  private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
-  private final ActorRef rpcRegistry;
-  private final String actorPath;
-
-  public RpcListener(ActorRef rpcRegistry, String actorPath) {
-    this.rpcRegistry = rpcRegistry;
-    this.actorPath = actorPath;
-  }
-
-  @Override
-  public void onRpcImplementationAdded(QName rpc) {
-    LOG.debug("Adding registration for [{}]", rpc);
-    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
-    AddRpc addRpcMsg = new AddRpc(routeId, actorPath);
-    try {
-      ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-      LOG.debug("Route added [{}-{}]", routeId, this.actorPath);
-    } catch (Exception e) {
-      // Just logging it because Akka API throws this exception
-      LOG.error(e.toString());
+/**
+ * A {@link DOMRpcAvailabilityListener} reacting to RPC implementations different than {@link RemoteRpcImplementation}.
+ * The knowledge of such implementations is forwarded to {@link RpcRegistry}, which is responsible for advertising
+ * their presence to other nodes.
+ */
+final class RpcListener implements DOMRpcAvailabilityListener {
+    private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+
+    private final ActorRef rpcRegistry;
+
+    RpcListener(final ActorRef rpcRegistry) {
+        this.rpcRegistry = requireNonNull(rpcRegistry);
+    }
+
+    @Override
+    public void onRpcAvailable(final Collection<DOMRpcIdentifier> rpcs) {
+        checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+        LOG.debug("Adding registration for [{}]", rpcs);
+
+        rpcRegistry.tell(new AddOrUpdateRoutes(rpcs), ActorRef.noSender());
+    }
+
+    @Override
+    public void onRpcUnavailable(final Collection<DOMRpcIdentifier> rpcs) {
+        checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
+
+        LOG.debug("Removing registration for [{}]", rpcs);
+        rpcRegistry.tell(new RemoveRoutes(rpcs), ActorRef.noSender());
     }
 
-  }
-
-  @Override
-  public void onRpcImplementationRemoved(QName rpc) {
-    LOG.debug("Removing registration for [{}]", rpc);
-    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
-    RemoveRpc removeRpcMsg = new RemoveRpc(routeId);
-    try {
-      ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-    } catch (Exception e) {
-      // Just logging it because Akka API throws this exception
-      LOG.error(e.toString());
+    @Override
+    public boolean acceptsImplementation(final DOMRpcImplementation impl) {
+        return !(impl instanceof RemoteRpcImplementation);
     }
-  }
 }