Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.dispatch.Mapper;
17 import akka.util.Timeout;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.Configuration;
22 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
24 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
25 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
26 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.Await;
37 import scala.concurrent.Future;
38 import scala.concurrent.duration.Duration;
39 import scala.concurrent.duration.FiniteDuration;
40 import java.util.concurrent.TimeUnit;
41 import static akka.pattern.Patterns.ask;
42
43 /**
44  * The ActorContext class contains utility methods which could be used by
45  * non-actors (like DistributedDataStore) to work with actors a little more
46  * easily. An ActorContext can be freely passed around to local object instances
47  * but should not be passed to actors especially remote actors
48  */
49 public class ActorContext {
50     private static final Logger
51         LOG = LoggerFactory.getLogger(ActorContext.class);
52
53     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
54
55     public static final String MAILBOX = "bounded-mailbox";
56
57     private final ActorSystem actorSystem;
58     private final ActorRef shardManager;
59     private final ClusterWrapper clusterWrapper;
60     private final Configuration configuration;
61     private volatile SchemaContext schemaContext;
62     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
63     private Timeout operationTimeout = new Timeout(operationDuration);
64
65     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
66         ClusterWrapper clusterWrapper,
67         Configuration configuration) {
68         this.actorSystem = actorSystem;
69         this.shardManager = shardManager;
70         this.clusterWrapper = clusterWrapper;
71         this.configuration = configuration;
72     }
73
74     public ActorSystem getActorSystem() {
75         return actorSystem;
76     }
77
78     public ActorRef getShardManager() {
79         return shardManager;
80     }
81
82     public ActorSelection actorSelection(String actorPath) {
83         return actorSystem.actorSelection(actorPath);
84     }
85
86     public ActorSelection actorSelection(ActorPath actorPath) {
87         return actorSystem.actorSelection(actorPath);
88     }
89
90     public void setSchemaContext(SchemaContext schemaContext) {
91         this.schemaContext = schemaContext;
92
93         if(shardManager != null) {
94             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
95         }
96     }
97
98     public void setOperationTimeout(int timeoutInSeconds) {
99         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
100         operationTimeout = new Timeout(operationDuration);
101     }
102
103     public SchemaContext getSchemaContext() {
104         return schemaContext;
105     }
106
107     /**
108      * Finds the primary shard for the given shard name
109      *
110      * @param shardName
111      * @return
112      */
113     public Optional<ActorSelection> findPrimaryShard(String shardName) {
114         String path = findPrimaryPathOrNull(shardName);
115         if (path == null){
116             return Optional.absent();
117         }
118         return Optional.of(actorSystem.actorSelection(path));
119     }
120
121     /**
122      * Finds a local shard given its shard name and return it's ActorRef
123      *
124      * @param shardName the name of the local shard that needs to be found
125      * @return a reference to a local shard actor which represents the shard
126      *         specified by the shardName
127      */
128     public Optional<ActorRef> findLocalShard(String shardName) {
129         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
130
131         if (result instanceof LocalShardFound) {
132             LocalShardFound found = (LocalShardFound) result;
133             LOG.debug("Local shard found {}", found.getPath());
134             return Optional.of(found.getPath());
135         }
136
137         return Optional.absent();
138     }
139
140     /**
141      * Finds a local shard async given its shard name and return a Future from which to obtain the
142      * ActorRef.
143      *
144      * @param shardName the name of the local shard that needs to be found
145      */
146     public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
147         Future<Object> future = executeOperationAsync(shardManager,
148                 new FindLocalShard(shardName, true), timeout);
149
150         return future.map(new Mapper<Object, ActorRef>() {
151             @Override
152             public ActorRef checkedApply(Object response) throws Throwable {
153                 if(response instanceof LocalShardFound) {
154                     LocalShardFound found = (LocalShardFound)response;
155                     LOG.debug("Local shard found {}", found.getPath());
156                     return found.getPath();
157                 } else if(response instanceof ActorNotInitialized) {
158                     throw new NotInitializedException(
159                             String.format("Found local shard for %s but it's not initialized yet.",
160                                     shardName));
161                 } else if(response instanceof LocalShardNotFound) {
162                     throw new LocalShardNotFoundException(
163                             String.format("Local shard for %s does not exist.", shardName));
164                 }
165
166                 throw new UnknownMessageException(String.format(
167                         "FindLocalShard returned unkown response: %s", response));
168             }
169         }, getActorSystem().dispatcher());
170     }
171
172     private String findPrimaryPathOrNull(String shardName) {
173         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
174
175         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
176             PrimaryFound found = PrimaryFound.fromSerializable(result);
177
178             LOG.debug("Primary found {}", found.getPrimaryPath());
179             return found.getPrimaryPath();
180
181         } else if (result.getClass().equals(ActorNotInitialized.class)){
182             throw new NotInitializedException(
183                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
184             );
185
186         } else {
187             return null;
188         }
189     }
190
191
192     /**
193      * Executes an operation on a local actor and wait for it's response
194      *
195      * @param actor
196      * @param message
197      * @return The response of the operation
198      */
199     public Object executeOperation(ActorRef actor, Object message) {
200         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
201
202         try {
203             return Await.result(future, operationDuration);
204         } catch (Exception e) {
205             throw new TimeoutException("Sending message " + message.getClass().toString() +
206                     " to actor " + actor.toString() + " failed. Try again later.", e);
207         }
208     }
209
210     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
211         Preconditions.checkArgument(actor != null, "actor must not be null");
212         Preconditions.checkArgument(message != null, "message must not be null");
213
214         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
215         return ask(actor, message, timeout);
216     }
217
218     /**
219      * Execute an operation on a remote actor and wait for it's response
220      *
221      * @param actor
222      * @param message
223      * @return
224      */
225     public Object executeOperation(ActorSelection actor, Object message) {
226         Future<Object> future = executeOperationAsync(actor, message);
227
228         try {
229             return Await.result(future, operationDuration);
230         } catch (Exception e) {
231             throw new TimeoutException("Sending message " + message.getClass().toString() +
232                     " to actor " + actor.toString() + " failed. Try again later.", e);
233         }
234     }
235
236     /**
237      * Execute an operation on a remote actor asynchronously.
238      *
239      * @param actor the ActorSelection
240      * @param message the message to send
241      * @return a Future containing the eventual result
242      */
243     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
244         Preconditions.checkArgument(actor != null, "actor must not be null");
245         Preconditions.checkArgument(message != null, "message must not be null");
246
247         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
248
249         return ask(actor, message, operationTimeout);
250     }
251
252     /**
253      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
254      * reply (essentially set and forget).
255      *
256      * @param actor the ActorSelection
257      * @param message the message to send
258      */
259     public void sendOperationAsync(ActorSelection actor, Object message) {
260         Preconditions.checkArgument(actor != null, "actor must not be null");
261         Preconditions.checkArgument(message != null, "message must not be null");
262
263         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
264
265         actor.tell(message, ActorRef.noSender());
266     }
267
268     public void shutdown() {
269         shardManager.tell(PoisonPill.getInstance(), null);
270         actorSystem.shutdown();
271     }
272
273     public ClusterWrapper getClusterWrapper() {
274         return clusterWrapper;
275     }
276
277     public String getCurrentMemberName(){
278         return clusterWrapper.getCurrentMemberName();
279     }
280
281     /**
282      * Send the message to each and every shard
283      *
284      * @param message
285      */
286     public void broadcast(Object message){
287         for(String shardName : configuration.getAllShardNames()){
288
289             Optional<ActorSelection> primary = findPrimaryShard(shardName);
290             if (primary.isPresent()) {
291                 primary.get().tell(message, ActorRef.noSender());
292             } else {
293                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
294                         message.getClass().getSimpleName(), shardName);
295             }
296         }
297     }
298
299     public FiniteDuration getOperationDuration() {
300         return operationDuration;
301     }
302
303     public boolean isLocalPath(String path) {
304         String selfAddress = clusterWrapper.getSelfAddress();
305         if (path == null || selfAddress == null) {
306             return false;
307         }
308
309         int atIndex1 = path.indexOf("@");
310         int atIndex2 = selfAddress.indexOf("@");
311
312         if (atIndex1 == -1 || atIndex2 == -1) {
313             return false;
314         }
315
316         int slashIndex1 = path.indexOf("/", atIndex1);
317         int slashIndex2 = selfAddress.indexOf("/", atIndex2);
318
319         if (slashIndex1 == -1 || slashIndex2 == -1) {
320             return false;
321         }
322
323         String hostPort1 = path.substring(atIndex1, slashIndex1);
324         String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
325
326         return hostPort1.equals(hostPort2);
327     }
328 }