2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorSelection;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.cluster.ClusterEvent;
14 import akka.cluster.Member;
15 import akka.japi.Creator;
16 import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
17 import org.opendaylight.controller.remote.rpc.ActorConstants;
18 import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
19 import org.opendaylight.controller.remote.rpc.messages.AddRpc;
20 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
21 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
22 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
23 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
24 import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
25 import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
26 import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.collection.JavaConversions;
32 import java.util.LinkedHashSet;
34 import java.util.Random;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
42 * This Actor maintains the routing table state and sync it with other nodes in the cluster.
44 * A scheduler runs after an interval of time, which pick a random member from the cluster
45 * and send the current state of routing table to the member.
47 * when a message of routing table data is received, it gets merged with the local routing table
48 * to keep the latest data.
51 public class RpcRegistryOld extends AbstractUntypedActor {
53 private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryOld.class);
54 private RoutingTableOld<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
55 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
56 private final ClusterWrapper clusterWrapper;
57 private final ScheduledFuture<?> syncScheduler;
59 private RpcRegistryOld(ClusterWrapper clusterWrapper){
60 this.routingTable = new RoutingTableOld<>();
61 this.clusterWrapper = clusterWrapper;
62 this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
65 public static Props props(final ClusterWrapper clusterWrapper){
66 return Props.create(new Creator<RpcRegistryOld>(){
69 public RpcRegistryOld create() throws Exception {
70 return new RpcRegistryOld(clusterWrapper);
76 protected void handleReceive(Object message) throws Exception {
77 LOG.debug("Received message {}", message);
78 if(message instanceof RoutingTableData) {
79 syncRoutingTable((RoutingTableData) message);
80 } else if(message instanceof GetRoutedRpc) {
81 getRoutedRpc((GetRoutedRpc) message);
82 } else if(message instanceof GetRpc) {
83 getRpc((GetRpc) message);
84 } else if(message instanceof AddRpc) {
85 addRpc((AddRpc) message);
86 } else if(message instanceof RemoveRpc) {
87 removeRpc((RemoveRpc) message);
88 } else if(message instanceof AddRoutedRpc) {
89 addRoutedRpc((AddRoutedRpc) message);
90 } else if(message instanceof RemoveRoutedRpc) {
91 removeRoutedRpc((RemoveRoutedRpc) message);
95 private void getRoutedRpc(GetRoutedRpc rpcMsg){
96 LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
97 String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
98 GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
100 getSender().tell(routedRpcReply, self());
103 private void getRpc(GetRpc rpcMsg) {
104 LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
105 String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
106 GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
108 getSender().tell(rpcReply, self());
111 private void addRpc(AddRpc rpcMsg) {
112 LOG.debug("Add Rpc to routing table {}", rpcMsg);
113 routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
115 getSender().tell("Success", self());
118 private void removeRpc(RemoveRpc rpcMsg) {
119 LOG.debug("Removing Rpc to routing table {}", rpcMsg);
120 routingTable.removeGlobalRoute(rpcMsg.getRouteId());
122 getSender().tell("Success", self());
125 private void addRoutedRpc(AddRoutedRpc rpcMsg) {
126 routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
127 getSender().tell("Success", self());
130 private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
131 routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
132 getSender().tell("Success", self());
135 private void syncRoutingTable(RoutingTableData routingTableData) {
136 LOG.debug("Syncing routing table {}", routingTableData);
138 Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
139 Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
140 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
141 routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
144 Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
145 routingTableData.getRoutedRpcMap();
146 routeIds = newRoutedRpcMap.keySet();
148 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
149 Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
150 for(String routeAddress : routeAddresses) {
151 routingTable.addRoutedRpc(routeId, routeAddress);
156 private ActorSelection getRandomRegistryActor() {
157 ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
158 ActorSelection actor = null;
159 Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
160 int memberSize = members.size();
161 // Don't select yourself
163 Address currentNodeAddress = clusterWrapper.getAddress();
164 int index = new Random().nextInt(memberSize);
166 // keeping previous member, in case when random index member is same as current actor
167 // and current actor member is last in set
168 Member previousMember = null;
169 for(Member member : members){
171 previousMember = member;
174 if(!currentNodeAddress.equals(member.address())) {
175 actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
177 } else if(index < memberSize-1){ // pick the next element in the set
183 if(actor == null && previousMember != null) {
184 actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
190 private class SendRoutingTable implements Runnable {
194 RoutingTableData routingTableData =
195 new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
196 LOG.debug("Sending routing table for sync {}", routingTableData);
197 ActorSelection actor = getRandomRegistryActor();
199 actor.tell(routingTableData, self());