Complete implementation of ThreePhaseCommitCohortProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.util.Timeout;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
19 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import scala.concurrent.Await;
23 import scala.concurrent.Future;
24 import scala.concurrent.duration.Duration;
25 import scala.concurrent.duration.FiniteDuration;
26
27 import java.util.concurrent.TimeUnit;
28
29 import static akka.pattern.Patterns.ask;
30
31 /**
32  * The ActorContext class contains utility methods which could be used by
33  * non-actors (like DistributedDataStore) to work with actors a little more
34  * easily. An ActorContext can be freely passed around to local object instances
35  * but should not be passed to actors especially remote actors
36  */
37 public class ActorContext {
38     private static final Logger
39         LOG = LoggerFactory.getLogger(ActorContext.class);
40
41     public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
42     public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
43
44     private final ActorSystem actorSystem;
45     private final ActorRef shardManager;
46
47     public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
48         this.actorSystem = actorSystem;
49         this.shardManager = shardManager;
50     }
51
52     public ActorSystem getActorSystem() {
53         return actorSystem;
54     }
55
56     public ActorRef getShardManager() {
57         return shardManager;
58     }
59
60     public ActorSelection actorSelection(String actorPath){
61         return actorSystem.actorSelection(actorPath);
62     }
63
64     public ActorSelection actorSelection(ActorPath actorPath){
65         return actorSystem.actorSelection(actorPath);
66     }
67
68
69     /**
70      * Finds the primary for a given shard
71      *
72      * @param shardName
73      * @return
74      */
75     public ActorSelection findPrimary(String shardName) {
76         Object result = executeLocalOperation(shardManager,
77             new FindPrimary(shardName), ASK_DURATION);
78
79         if(result instanceof PrimaryFound){
80             PrimaryFound found = (PrimaryFound) result;
81
82             LOG.error("Primary found {}", found.getPrimaryPath());
83
84             return actorSystem.actorSelection(found.getPrimaryPath());
85         }
86         throw new PrimaryNotFoundException();
87     }
88
89     /**
90      * Executes an operation on a local actor and wait for it's response
91      * @param actor
92      * @param message
93      * @param duration
94      * @return The response of the operation
95      */
96     public Object executeLocalOperation(ActorRef actor, Object message,
97         FiniteDuration duration){
98         Future<Object> future =
99             ask(actor, message, new Timeout(duration));
100
101         try {
102             return Await.result(future, AWAIT_DURATION);
103         } catch (Exception e) {
104             throw new TimeoutException(e);
105         }
106     }
107
108     /**
109      * Execute an operation on a remote actor and wait for it's response
110      * @param actor
111      * @param message
112      * @param duration
113      * @return
114      */
115     public Object executeRemoteOperation(ActorSelection actor, Object message,
116         FiniteDuration duration){
117         Future<Object> future =
118             ask(actor, message, new Timeout(duration));
119
120         try {
121             return Await.result(future, AWAIT_DURATION);
122         } catch (Exception e) {
123             throw new TimeoutException(e);
124         }
125     }
126
127     /**
128      * Execute an operation on the primary for a given shard
129      * <p>
130      *     This method first finds the primary for a given shard ,then sends
131      *     the message to the remote shard and waits for a response
132      * </p>
133      * @param shardName
134      * @param message
135      * @param duration
136      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
137      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
138      *
139      * @return
140      */
141     public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
142         ActorSelection primary = findPrimary(shardName);
143
144         return executeRemoteOperation(primary, message, duration);
145     }
146
147 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.