Added hosttracker shell for karaf (rebased)
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import com.google.common.cache.CacheBuilder;
18 import com.google.common.cache.CacheLoader;
19 import com.google.common.cache.LoadingCache;
20 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.Configuration;
22 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
24 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Await;
30 import scala.concurrent.Future;
31 import scala.concurrent.duration.Duration;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.util.concurrent.TimeUnit;
35
36 import static akka.pattern.Patterns.ask;
37
38 /**
39  * The ActorContext class contains utility methods which could be used by
40  * non-actors (like DistributedDataStore) to work with actors a little more
41  * easily. An ActorContext can be freely passed around to local object instances
42  * but should not be passed to actors especially remote actors
43  */
44 public class ActorContext {
45     private static final Logger
46         LOG = LoggerFactory.getLogger(ActorContext.class);
47
48     public static final FiniteDuration ASK_DURATION =
49         Duration.create(5, TimeUnit.SECONDS);
50     public static final Duration AWAIT_DURATION =
51         Duration.create(5, TimeUnit.SECONDS);
52
53     private final ActorSystem actorSystem;
54     private final ActorRef shardManager;
55     private final ClusterWrapper clusterWrapper;
56     private final Configuration configuration;
57
58     private SchemaContext schemaContext = null;
59
60     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
61         ClusterWrapper clusterWrapper,
62         Configuration configuration) {
63         this.actorSystem = actorSystem;
64         this.shardManager = shardManager;
65         this.clusterWrapper = clusterWrapper;
66         this.configuration = configuration;
67     }
68
69     public ActorSystem getActorSystem() {
70         return actorSystem;
71     }
72
73     public ActorRef getShardManager() {
74         return shardManager;
75     }
76
77     public ActorSelection actorSelection(String actorPath) {
78         return actorSystem.actorSelection(actorPath);
79     }
80
81     public ActorSelection actorSelection(ActorPath actorPath) {
82         return actorSystem.actorSelection(actorPath);
83     }
84
85
86     /**
87      * Finds the primary for a given shard
88      *
89      * @param shardName
90      * @return
91      */
92     public ActorSelection findPrimary(String shardName) {
93         String path = findPrimaryPath(shardName);
94         return actorSystem.actorSelection(path);
95     }
96
97     public String findPrimaryPath(String shardName) {
98         Object result = executeLocalOperation(shardManager,
99             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
100
101         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
102             PrimaryFound found = PrimaryFound.fromSerializable(result);
103
104             LOG.debug("Primary found {}", found.getPrimaryPath());
105
106             return found.getPrimaryPath();
107         }
108         throw new PrimaryNotFoundException();
109     }
110
111
112     /**
113      * Executes an operation on a local actor and wait for it's response
114      *
115      * @param actor
116      * @param message
117      * @param duration
118      * @return The response of the operation
119      */
120     public Object executeLocalOperation(ActorRef actor, Object message,
121         FiniteDuration duration) {
122         Future<Object> future =
123             ask(actor, message, new Timeout(duration));
124
125         try {
126             return Await.result(future, AWAIT_DURATION);
127         } catch (Exception e) {
128             throw new TimeoutException(e);
129         }
130     }
131
132     /**
133      * Execute an operation on a remote actor and wait for it's response
134      *
135      * @param actor
136      * @param message
137      * @param duration
138      * @return
139      */
140     public Object executeRemoteOperation(ActorSelection actor, Object message,
141         FiniteDuration duration) {
142
143         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
144
145         Future<Object> future =
146             ask(actor, message, new Timeout(duration));
147
148         try {
149             return Await.result(future, AWAIT_DURATION);
150         } catch (Exception e) {
151             throw new TimeoutException(e);
152         }
153     }
154
155     /**
156      * Execute an operation on the primary for a given shard
157      * <p>
158      * This method first finds the primary for a given shard ,then sends
159      * the message to the remote shard and waits for a response
160      * </p>
161      *
162      * @param shardName
163      * @param message
164      * @param duration
165      * @return
166      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
167      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
168      */
169     public Object executeShardOperation(String shardName, Object message,
170         FiniteDuration duration) {
171         ActorSelection primary = findPrimary(shardName);
172
173         return executeRemoteOperation(primary, message, duration);
174     }
175
176     public void shutdown() {
177         shardManager.tell(PoisonPill.getInstance(), null);
178         actorSystem.shutdown();
179     }
180
181     public String getRemoteActorPath(final String shardName,
182         final String localPathOfRemoteActor) {
183         final String path = findPrimaryPath(shardName);
184
185         LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
186             .expireAfterAccess(2, TimeUnit.SECONDS)
187             .build(
188                 new CacheLoader<String, String>() {
189                     public String load(String key) {
190                         return resolvePath(path, localPathOfRemoteActor);
191                     }
192                 }
193             );
194         return graphs.getUnchecked(localPathOfRemoteActor);
195     }
196
197     public String resolvePath(final String primaryPath,
198         final String localPathOfRemoteActor) {
199         StringBuilder builder = new StringBuilder();
200         String[] primaryPathElements = primaryPath.split("/");
201         builder.append(primaryPathElements[0]).append("//")
202             .append(primaryPathElements[1]).append(primaryPathElements[2]);
203         String[] remotePathElements = localPathOfRemoteActor.split("/");
204         for (int i = 3; i < remotePathElements.length; i++) {
205             builder.append("/").append(remotePathElements[i]);
206         }
207
208         return builder.toString();
209
210     }
211
212     public ActorPath actorFor(String path){
213         return actorSystem.actorFor(path).path();
214     }
215
216     public String getCurrentMemberName(){
217         return clusterWrapper.getCurrentMemberName();
218     }
219
220 }