Merge "Bug 1236 - Documented Binding-aware RPC services of MD-SAL"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.util.Timeout;
16 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
17 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import scala.concurrent.Await;
21 import scala.concurrent.Future;
22 import scala.concurrent.duration.Duration;
23 import scala.concurrent.duration.FiniteDuration;
24
25 import java.util.concurrent.TimeUnit;
26
27 import static akka.pattern.Patterns.ask;
28
29 /**
30  * The ActorContext class contains utility methods which could be used by
31  * non-actors (like DistributedDataStore) to work with actors a little more
32  * easily. An ActorContext can be freely passed around to local object instances
33  * but should not be passed to actors especially remote actors
34  */
35 public class ActorContext {
36     private static final Logger
37         LOG = LoggerFactory.getLogger(ActorContext.class);
38
39     public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
40     public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
41
42     private final ActorSystem actorSystem;
43     private final ActorRef shardManager;
44
45     public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
46         this.actorSystem = actorSystem;
47         this.shardManager = shardManager;
48     }
49
50     public ActorSystem getActorSystem() {
51         return actorSystem;
52     }
53
54     public ActorRef getShardManager() {
55         return shardManager;
56     }
57
58     public ActorSelection actorSelection(String actorPath){
59         return actorSystem.actorSelection(actorPath);
60     }
61
62     public ActorSelection actorSelection(ActorPath actorPath){
63         return actorSystem.actorSelection(actorPath);
64     }
65
66
67     /**
68      * Finds the primary for a given shard
69      *
70      * @param shardName
71      * @return
72      */
73     public ActorSelection findPrimary(String shardName) {
74         Object result = executeLocalOperation(shardManager,
75             new FindPrimary(shardName), ASK_DURATION);
76
77         if(result instanceof PrimaryFound){
78             PrimaryFound found = (PrimaryFound) result;
79
80             LOG.error("Primary found {}", found.getPrimaryPath());
81
82             return actorSystem.actorSelection(found.getPrimaryPath());
83         }
84         throw new RuntimeException("primary was not found");
85     }
86
87     /**
88      * Executes an operation on a local actor and wait for it's response
89      * @param actor
90      * @param message
91      * @param duration
92      * @return The response of the operation
93      */
94     public Object executeLocalOperation(ActorRef actor, Object message,
95         FiniteDuration duration){
96         Future<Object> future =
97             ask(actor, message, new Timeout(duration));
98
99         try {
100             return Await.result(future, AWAIT_DURATION);
101         } catch (Exception e) {
102             throw new RuntimeException(e);
103         }
104     }
105
106     /**
107      * Execute an operation on a remote actor and wait for it's response
108      * @param actor
109      * @param message
110      * @param duration
111      * @return
112      */
113     public Object executeRemoteOperation(ActorSelection actor, Object message,
114         FiniteDuration duration){
115         Future<Object> future =
116             ask(actor, message, new Timeout(duration));
117
118         try {
119             return Await.result(future, AWAIT_DURATION);
120         } catch (Exception e) {
121             throw new RuntimeException(e);
122         }
123     }
124
125     /**
126      * Execute an operation on the primary for a given shard
127      * <p>
128      *     This method first finds the primary for a given shard ,then sends
129      *     the message to the remote shard and waits for a response
130      * </p>
131      * @param shardName
132      * @param message
133      * @param duration
134      * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
135      *
136      * @return
137      */
138     public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
139         ActorSelection primary = findPrimary(shardName);
140
141         return executeRemoteOperation(primary, message, duration);
142     }
143
144 }