Merge "BUG-2184 Fix config.yang module(add type as a key for modules list)"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
20 import org.opendaylight.controller.cluster.datastore.Configuration;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
23 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
24 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
25 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
26 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
27 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
28 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
29 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.Duration;
35 import scala.concurrent.duration.FiniteDuration;
36 import java.util.concurrent.TimeUnit;
37 import static akka.pattern.Patterns.ask;
38
39 /**
40  * The ActorContext class contains utility methods which could be used by
41  * non-actors (like DistributedDataStore) to work with actors a little more
42  * easily. An ActorContext can be freely passed around to local object instances
43  * but should not be passed to actors especially remote actors
44  */
45 public class ActorContext {
46     private static final Logger
47         LOG = LoggerFactory.getLogger(ActorContext.class);
48
49     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
50
51     public static final String MAILBOX = "bounded-mailbox";
52
53     private final ActorSystem actorSystem;
54     private final ActorRef shardManager;
55     private final ClusterWrapper clusterWrapper;
56     private final Configuration configuration;
57     private volatile SchemaContext schemaContext;
58     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
59     private Timeout operationTimeout = new Timeout(operationDuration);
60
61     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62         ClusterWrapper clusterWrapper,
63         Configuration configuration) {
64         this.actorSystem = actorSystem;
65         this.shardManager = shardManager;
66         this.clusterWrapper = clusterWrapper;
67         this.configuration = configuration;
68     }
69
70     public ActorSystem getActorSystem() {
71         return actorSystem;
72     }
73
74     public ActorRef getShardManager() {
75         return shardManager;
76     }
77
78     public ActorSelection actorSelection(String actorPath) {
79         return actorSystem.actorSelection(actorPath);
80     }
81
82     public ActorSelection actorSelection(ActorPath actorPath) {
83         return actorSystem.actorSelection(actorPath);
84     }
85
86     public void setSchemaContext(SchemaContext schemaContext) {
87         this.schemaContext = schemaContext;
88
89         if(shardManager != null) {
90             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
91         }
92     }
93
94     public void setOperationTimeout(int timeoutInSeconds) {
95         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
96         operationTimeout = new Timeout(operationDuration);
97     }
98
99     public SchemaContext getSchemaContext() {
100         return schemaContext;
101     }
102
103     /**
104      * Finds the primary shard for the given shard name
105      *
106      * @param shardName
107      * @return
108      */
109     public Optional<ActorSelection> findPrimaryShard(String shardName) {
110         String path = findPrimaryPathOrNull(shardName);
111         if (path == null){
112             return Optional.absent();
113         }
114         return Optional.of(actorSystem.actorSelection(path));
115     }
116
117     /**
118      * Finds a local shard given it's shard name and return it's ActorRef
119      *
120      * @param shardName the name of the local shard that needs to be found
121      * @return a reference to a local shard actor which represents the shard
122      *         specified by the shardName
123      */
124     public Optional<ActorRef> findLocalShard(String shardName) {
125         Object result = executeOperation(shardManager, new FindLocalShard(shardName));
126
127         if (result instanceof LocalShardFound) {
128             LocalShardFound found = (LocalShardFound) result;
129             LOG.debug("Local shard found {}", found.getPath());
130             return Optional.of(found.getPath());
131         }
132
133         return Optional.absent();
134     }
135
136
137     private String findPrimaryPathOrNull(String shardName) {
138         Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
139
140         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
141             PrimaryFound found = PrimaryFound.fromSerializable(result);
142
143             LOG.debug("Primary found {}", found.getPrimaryPath());
144             return found.getPrimaryPath();
145
146         } else if (result.getClass().equals(ActorNotInitialized.class)){
147             throw new NotInitializedException(
148                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
149             );
150
151         } else {
152             return null;
153         }
154     }
155
156
157     /**
158      * Executes an operation on a local actor and wait for it's response
159      *
160      * @param actor
161      * @param message
162      * @return The response of the operation
163      */
164     public Object executeOperation(ActorRef actor, Object message) {
165         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
166
167         try {
168             return Await.result(future, operationDuration);
169         } catch (Exception e) {
170             throw new TimeoutException("Sending message " + message.getClass().toString() +
171                     " to actor " + actor.toString() + " failed. Try again later.", e);
172         }
173     }
174
175     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
176         Preconditions.checkArgument(actor != null, "actor must not be null");
177         Preconditions.checkArgument(message != null, "message must not be null");
178
179         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
180         return ask(actor, message, timeout);
181     }
182
183     /**
184      * Execute an operation on a remote actor and wait for it's response
185      *
186      * @param actor
187      * @param message
188      * @return
189      */
190     public Object executeOperation(ActorSelection actor, Object message) {
191         Future<Object> future = executeOperationAsync(actor, message);
192
193         try {
194             return Await.result(future, operationDuration);
195         } catch (Exception e) {
196             throw new TimeoutException("Sending message " + message.getClass().toString() +
197                     " to actor " + actor.toString() + " failed. Try again later.", e);
198         }
199     }
200
201     /**
202      * Execute an operation on a remote actor asynchronously.
203      *
204      * @param actor the ActorSelection
205      * @param message the message to send
206      * @return a Future containing the eventual result
207      */
208     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
209         Preconditions.checkArgument(actor != null, "actor must not be null");
210         Preconditions.checkArgument(message != null, "message must not be null");
211
212         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
213
214         return ask(actor, message, operationTimeout);
215     }
216
217     /**
218      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
219      * reply (essentially set and forget).
220      *
221      * @param actor the ActorSelection
222      * @param message the message to send
223      */
224     public void sendOperationAsync(ActorSelection actor, Object message) {
225         Preconditions.checkArgument(actor != null, "actor must not be null");
226         Preconditions.checkArgument(message != null, "message must not be null");
227
228         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
229
230         actor.tell(message, ActorRef.noSender());
231     }
232
233     public void shutdown() {
234         shardManager.tell(PoisonPill.getInstance(), null);
235         actorSystem.shutdown();
236     }
237
238     public ClusterWrapper getClusterWrapper() {
239         return clusterWrapper;
240     }
241
242     public String getCurrentMemberName(){
243         return clusterWrapper.getCurrentMemberName();
244     }
245
246     /**
247      * Send the message to each and every shard
248      *
249      * @param message
250      */
251     public void broadcast(Object message){
252         for(String shardName : configuration.getAllShardNames()){
253
254             Optional<ActorSelection> primary = findPrimaryShard(shardName);
255             if (primary.isPresent()) {
256                 primary.get().tell(message, ActorRef.noSender());
257             } else {
258                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
259                         message.getClass().getSimpleName(), shardName);
260             }
261         }
262     }
263
264     public FiniteDuration getOperationDuration() {
265         return operationDuration;
266     }
267
268     public boolean isLocalPath(String path) {
269         String selfAddress = clusterWrapper.getSelfAddress();
270         if (path == null || selfAddress == null) {
271             return false;
272         }
273
274         int atIndex1 = path.indexOf("@");
275         int atIndex2 = selfAddress.indexOf("@");
276
277         if (atIndex1 == -1 || atIndex2 == -1) {
278             return false;
279         }
280
281         int slashIndex1 = path.indexOf("/", atIndex1);
282         int slashIndex2 = selfAddress.indexOf("/", atIndex2);
283
284         if (slashIndex1 == -1 || slashIndex2 == -1) {
285             return false;
286         }
287
288         String hostPort1 = path.substring(atIndex1, slashIndex1);
289         String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
290
291         return hostPort1.equals(hostPort2);
292     }
293 }