e14db0fe6abf34bd222c58c75b495c6cb4b96fba
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortRegistrationProxy.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.Patterns;
13 import akka.util.Timeout;
14 import com.google.common.base.Preconditions;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.concurrent.GuardedBy;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.FiniteDuration;
27
28 public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C>
29         implements DOMDataTreeCommitCohortRegistration<C> {
30
31     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
32     private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
33     private final DOMDataTreeIdentifier subtree;
34     private final ActorRef actor;
35     private final ActorContext actorContext;
36     @GuardedBy("this")
37     private ActorRef cohortRegistry;
38
39     DataTreeCohortRegistrationProxy(final ActorContext actorContext, final DOMDataTreeIdentifier subtree,
40             final C cohort) {
41         super(cohort);
42         this.subtree = Preconditions.checkNotNull(subtree);
43         this.actorContext = Preconditions.checkNotNull(actorContext);
44         this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
45                 subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
46     }
47
48     public void init(final String shardName) {
49         // FIXME: Add late binding to shard.
50         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
51         findFuture.onComplete(new OnComplete<ActorRef>() {
52             @Override
53             public void onComplete(final Throwable failure, final ActorRef shard) {
54                 if (failure instanceof LocalShardNotFoundException) {
55                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
56                             + "cannot be registered", shardName, getInstance(), subtree);
57                 } else if (failure != null) {
58                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
59                             + "cannot be registered", shardName, getInstance(), subtree, failure);
60                 } else {
61                     performRegistration(shard);
62                 }
63             }
64         }, actorContext.getClientDispatcher());
65     }
66
67     private synchronized void performRegistration(final ActorRef shard) {
68         if (isClosed()) {
69             return;
70         }
71         cohortRegistry = shard;
72         Future<Object> future =
73                 Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
74         future.onComplete(new OnComplete<Object>() {
75
76             @Override
77             public void onComplete(final Throwable failure, final Object val) {
78                 if (failure != null) {
79                     LOG.error("Unable to register {} as commit cohort", getInstance(), failure);
80                 }
81                 if (isClosed()) {
82                     removeRegistration();
83                 }
84             }
85
86         }, actorContext.getClientDispatcher());
87     }
88
89     @Override
90     protected synchronized void removeRegistration() {
91         if (cohortRegistry != null) {
92             cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());
93         }
94     }
95 }