Merge "Fixed for bug 1197"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorSelection;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.cluster.ClusterEvent;
14 import akka.cluster.Member;
15 import akka.japi.Creator;
16 import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
17 import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
18 import org.opendaylight.controller.remote.rpc.messages.AddRpc;
19 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
20 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
21 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
22 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
23 import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
24 import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
25 import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
26 import org.opendaylight.controller.sal.connector.api.RpcRouter;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.collection.JavaConversions;
30
31 import java.util.LinkedHashSet;
32 import java.util.Map;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39
40 /**
41  * This Actor maintains the routing table state and sync it with other nodes in the cluster.
42  *
43  * A scheduler runs after an interval of time, which pick a random member from the cluster
44  * and send the current state of routing table to the member.
45  *
46  * when a message of routing table data is received, it gets merged with the local routing table
47  * to keep the latest data.
48  */
49
50 public class RpcRegistry extends AbstractUntypedActor {
51
52   private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
53   private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
54   private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
55   private final ClusterWrapper clusterWrapper;
56   private final ScheduledFuture<?> syncScheduler;
57
58   private RpcRegistry(ClusterWrapper clusterWrapper){
59     this.routingTable = new RoutingTable<>();
60     this.clusterWrapper = clusterWrapper;
61     this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
62   }
63
64   public static Props props(final ClusterWrapper clusterWrapper){
65     return Props.create(new Creator<RpcRegistry>(){
66
67       @Override
68       public RpcRegistry create() throws Exception {
69         return new RpcRegistry(clusterWrapper);
70       }
71     });
72   }
73
74   @Override
75   protected void handleReceive(Object message) throws Exception {
76     LOG.debug("Received message {}", message);
77     if(message instanceof RoutingTableData) {
78       syncRoutingTable((RoutingTableData) message);
79     } else if(message instanceof GetRoutedRpc) {
80       getRoutedRpc((GetRoutedRpc) message);
81     } else if(message instanceof GetRpc) {
82       getRpc((GetRpc) message);
83     } else if(message instanceof AddRpc) {
84       addRpc((AddRpc) message);
85     } else if(message instanceof RemoveRpc) {
86       removeRpc((RemoveRpc) message);
87     } else if(message instanceof AddRoutedRpc) {
88       addRoutedRpc((AddRoutedRpc) message);
89     } else if(message instanceof RemoveRoutedRpc) {
90       removeRoutedRpc((RemoveRoutedRpc) message);
91     }
92   }
93
94   private void getRoutedRpc(GetRoutedRpc rpcMsg){
95     LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
96     String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
97     GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
98
99     getSender().tell(routedRpcReply, self());
100   }
101
102   private void getRpc(GetRpc rpcMsg) {
103     LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
104     String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
105     GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
106
107     getSender().tell(rpcReply, self());
108   }
109
110   private void addRpc(AddRpc rpcMsg) {
111     LOG.debug("Add Rpc to routing table {}", rpcMsg);
112     routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
113
114     getSender().tell("Success", self());
115   }
116
117   private void removeRpc(RemoveRpc rpcMsg) {
118     LOG.debug("Removing Rpc to routing table {}", rpcMsg);
119     routingTable.removeGlobalRoute(rpcMsg.getRouteId());
120
121     getSender().tell("Success", self());
122   }
123
124   private void addRoutedRpc(AddRoutedRpc rpcMsg) {
125     routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
126     getSender().tell("Success", self());
127   }
128
129   private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
130     routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
131     getSender().tell("Success", self());
132   }
133
134   private void syncRoutingTable(RoutingTableData routingTableData) {
135     LOG.debug("Syncing routing table {}", routingTableData);
136
137     Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
138     Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
139     for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
140       routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
141     }
142
143     Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
144         routingTableData.getRoutedRpcMap();
145     routeIds = newRoutedRpcMap.keySet();
146
147     for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
148       Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
149       for(String routeAddress : routeAddresses) {
150         routingTable.addRoutedRpc(routeId, routeAddress);
151       }
152     }
153   }
154
155   private ActorSelection getRandomRegistryActor() {
156     ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
157     ActorSelection actor = null;
158     Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
159     int memberSize = members.size();
160     // Don't select yourself
161     if(memberSize > 1) {
162       Address currentNodeAddress = clusterWrapper.getAddress();
163       int index = new Random().nextInt(memberSize);
164       int i = 0;
165       // keeping previous member, in case when random index member is same as current actor
166       // and current actor member is last in set
167       Member previousMember = null;
168       for(Member member : members){
169         if(i == index-1) {
170           previousMember = member;
171         }
172         if(i == index) {
173           if(!currentNodeAddress.equals(member.address())) {
174             actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
175             break;
176           } else if(index < memberSize-1){ // pick the next element in the set
177             index++;
178           }
179         }
180         i++;
181       }
182       if(actor == null && previousMember != null) {
183         actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
184       }
185     }
186     return actor;
187   }
188
189   private class SendRoutingTable implements Runnable {
190
191     @Override
192     public void run() {
193       RoutingTableData routingTableData =
194           new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
195       LOG.debug("Sending routing table for sync {}", routingTableData);
196       ActorSelection actor = getRandomRegistryActor();
197       if(actor != null) {
198         actor.tell(routingTableData, self());
199       }
200     }
201   }
202 }