2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorSelection;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.cluster.ClusterEvent;
14 import akka.cluster.Member;
15 import akka.japi.Creator;
16 import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
17 import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
18 import org.opendaylight.controller.remote.rpc.messages.AddRpc;
19 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
20 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
21 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
22 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
23 import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
24 import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
25 import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
26 import org.opendaylight.controller.sal.connector.api.RpcRouter;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.collection.JavaConversions;
31 import java.util.LinkedHashSet;
33 import java.util.Random;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
41 * This Actor maintains the routing table state and sync it with other nodes in the cluster.
43 * A scheduler runs after an interval of time, which pick a random member from the cluster
44 * and send the current state of routing table to the member.
46 * when a message of routing table data is received, it gets merged with the local routing table
47 * to keep the latest data.
50 public class RpcRegistry extends AbstractUntypedActor {
52 private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
53 private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
54 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
55 private final ClusterWrapper clusterWrapper;
56 private final ScheduledFuture<?> syncScheduler;
58 private RpcRegistry(ClusterWrapper clusterWrapper){
59 this.routingTable = new RoutingTable<>();
60 this.clusterWrapper = clusterWrapper;
61 this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
64 public static Props props(final ClusterWrapper clusterWrapper){
65 return Props.create(new Creator<RpcRegistry>(){
68 public RpcRegistry create() throws Exception {
69 return new RpcRegistry(clusterWrapper);
75 protected void handleReceive(Object message) throws Exception {
76 LOG.debug("Received message {}", message);
77 if(message instanceof RoutingTableData) {
78 syncRoutingTable((RoutingTableData) message);
79 } else if(message instanceof GetRoutedRpc) {
80 getRoutedRpc((GetRoutedRpc) message);
81 } else if(message instanceof GetRpc) {
82 getRpc((GetRpc) message);
83 } else if(message instanceof AddRpc) {
84 addRpc((AddRpc) message);
85 } else if(message instanceof RemoveRpc) {
86 removeRpc((RemoveRpc) message);
87 } else if(message instanceof AddRoutedRpc) {
88 addRoutedRpc((AddRoutedRpc) message);
89 } else if(message instanceof RemoveRoutedRpc) {
90 removeRoutedRpc((RemoveRoutedRpc) message);
94 private void getRoutedRpc(GetRoutedRpc rpcMsg){
95 LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
96 String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
97 GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
99 getSender().tell(routedRpcReply, self());
102 private void getRpc(GetRpc rpcMsg) {
103 LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
104 String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
105 GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
107 getSender().tell(rpcReply, self());
110 private void addRpc(AddRpc rpcMsg) {
111 LOG.debug("Add Rpc to routing table {}", rpcMsg);
112 routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
114 getSender().tell("Success", self());
117 private void removeRpc(RemoveRpc rpcMsg) {
118 LOG.debug("Removing Rpc to routing table {}", rpcMsg);
119 routingTable.removeGlobalRoute(rpcMsg.getRouteId());
121 getSender().tell("Success", self());
124 private void addRoutedRpc(AddRoutedRpc rpcMsg) {
125 routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
126 getSender().tell("Success", self());
129 private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
130 routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
131 getSender().tell("Success", self());
134 private void syncRoutingTable(RoutingTableData routingTableData) {
135 LOG.debug("Syncing routing table {}", routingTableData);
137 Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
138 Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
139 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
140 routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
143 Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
144 routingTableData.getRoutedRpcMap();
145 routeIds = newRoutedRpcMap.keySet();
147 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
148 Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
149 for(String routeAddress : routeAddresses) {
150 routingTable.addRoutedRpc(routeId, routeAddress);
155 private ActorSelection getRandomRegistryActor() {
156 ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
157 ActorSelection actor = null;
158 Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
159 int memberSize = members.size();
160 // Don't select yourself
162 Address currentNodeAddress = clusterWrapper.getAddress();
163 int index = new Random().nextInt(memberSize);
165 // keeping previous member, in case when random index member is same as current actor
166 // and current actor member is last in set
167 Member previousMember = null;
168 for(Member member : members){
170 previousMember = member;
173 if(!currentNodeAddress.equals(member.address())) {
174 actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
176 } else if(index < memberSize-1){ // pick the next element in the set
182 if(actor == null && previousMember != null) {
183 actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
189 private class SendRoutingTable implements Runnable {
193 RoutingTableData routingTableData =
194 new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
195 LOG.debug("Sending routing table for sync {}", routingTableData);
196 ActorSelection actor = getRandomRegistryActor();
198 actor.tell(routingTableData, self());