c97e07db6d60c867c9f4c1687c9495ae9722a3c9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
19 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import scala.concurrent.Await;
24 import scala.concurrent.Future;
25 import scala.concurrent.duration.Duration;
26 import scala.concurrent.duration.FiniteDuration;
27
28 import java.util.concurrent.TimeUnit;
29
30 import static akka.pattern.Patterns.ask;
31
32 /**
33  * The ActorContext class contains utility methods which could be used by
34  * non-actors (like DistributedDataStore) to work with actors a little more
35  * easily. An ActorContext can be freely passed around to local object instances
36  * but should not be passed to actors especially remote actors
37  */
38 public class ActorContext {
39     private static final Logger
40         LOG = LoggerFactory.getLogger(ActorContext.class);
41
42     public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
43     public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
44
45     private final ActorSystem actorSystem;
46     private final ActorRef shardManager;
47
48     public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
49         this.actorSystem = actorSystem;
50         this.shardManager = shardManager;
51     }
52
53     public ActorSystem getActorSystem() {
54         return actorSystem;
55     }
56
57     public ActorRef getShardManager() {
58         return shardManager;
59     }
60
61     public ActorSelection actorSelection(String actorPath){
62         return actorSystem.actorSelection(actorPath);
63     }
64
65     public ActorSelection actorSelection(ActorPath actorPath){
66         return actorSystem.actorSelection(actorPath);
67     }
68
69
70     /**
71      * Finds the primary for a given shard
72      *
73      * @param shardName
74      * @return
75      */
76     public ActorSelection findPrimary(String shardName) {
77         Object result = executeLocalOperation(shardManager,
78             new FindPrimary(shardName), ASK_DURATION);
79
80         if(result instanceof PrimaryFound){
81             PrimaryFound found = (PrimaryFound) result;
82
83             LOG.error("Primary found {}", found.getPrimaryPath());
84
85             return actorSystem.actorSelection(found.getPrimaryPath());
86         }
87         throw new PrimaryNotFoundException();
88     }
89
90     /**
91      * Executes an operation on a local actor and wait for it's response
92      * @param actor
93      * @param message
94      * @param duration
95      * @return The response of the operation
96      */
97     public Object executeLocalOperation(ActorRef actor, Object message,
98         FiniteDuration duration){
99         Future<Object> future =
100             ask(actor, message, new Timeout(duration));
101
102         try {
103             return Await.result(future, AWAIT_DURATION);
104         } catch (Exception e) {
105             throw new TimeoutException(e);
106         }
107     }
108
109     /**
110      * Execute an operation on a remote actor and wait for it's response
111      * @param actor
112      * @param message
113      * @param duration
114      * @return
115      */
116     public Object executeRemoteOperation(ActorSelection actor, Object message,
117         FiniteDuration duration){
118         Future<Object> future =
119             ask(actor, message, new Timeout(duration));
120
121         try {
122             return Await.result(future, AWAIT_DURATION);
123         } catch (Exception e) {
124             throw new TimeoutException(e);
125         }
126     }
127
128     /**
129      * Execute an operation on the primary for a given shard
130      * <p>
131      *     This method first finds the primary for a given shard ,then sends
132      *     the message to the remote shard and waits for a response
133      * </p>
134      * @param shardName
135      * @param message
136      * @param duration
137      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
138      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
139      *
140      * @return
141      */
142     public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
143         ActorSelection primary = findPrimary(shardName);
144
145         return executeRemoteOperation(primary, message, duration);
146     }
147
148     public void shutdown() {
149         shardManager.tell(PoisonPill.getInstance(), null);
150         actorSystem.shutdown();
151     }
152 }