2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
12 import akka.actor.ActorRef;
13 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
15 import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
16 import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
17 import org.opendaylight.controller.sal.connector.api.RpcRouter;
18 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
19 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import java.util.HashSet;
27 public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
28 private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
29 private final ActorRef rpcRegistry;
30 private final String actorPath;
32 public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) {
33 this.rpcRegistry = rpcRegistry;
34 this.actorPath = actorPath;
38 public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
39 Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
40 announce(getRouteIdentifiers(announcements));
42 Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
43 remove(getRouteIdentifiers(removals));
48 * @param announcements
50 private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
51 LOG.debug("Announcing [{}]", announcements);
52 AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath);
54 ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
55 } catch (Exception e) {
56 // Just logging it because Akka API throws this exception
57 LOG.error(e.toString());
65 private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
66 LOG.debug("Removing [{}]", removals);
67 RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath);
69 ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
70 } catch (Exception e) {
71 // Just logging it because Akka API throws this exception
72 LOG.error(e.toString());
81 private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
82 RouteIdentifierImpl routeId = null;
83 Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
85 for (RpcRoutingContext context : changes.keySet()){
86 for (InstanceIdentifier instanceId : changes.get(context)){
87 routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
88 routeIdSet.add(routeId);