Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortRegistrationProxy.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import akka.util.Timeout;
16 import java.util.concurrent.TimeUnit;
17 import org.checkerframework.checker.lock.qual.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
19 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.FiniteDuration;
27
28 public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C> {
29     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
30     private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
31
32     private final DOMDataTreeIdentifier subtree;
33     private final ActorRef actor;
34     private final ActorUtils actorUtils;
35     @GuardedBy("this")
36     private ActorRef cohortRegistry;
37
38     DataTreeCohortRegistrationProxy(final ActorUtils actorUtils, final DOMDataTreeIdentifier subtree,
39             final C cohort) {
40         super(cohort);
41         this.subtree = requireNonNull(subtree);
42         this.actorUtils = requireNonNull(actorUtils);
43         actor = actorUtils.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
44                 subtree.path()).withDispatcher(actorUtils.getNotificationDispatcherPath()));
45     }
46
47     public void init(final String shardName) {
48         // FIXME: Add late binding to shard.
49         Future<ActorRef> findFuture = actorUtils.findLocalShardAsync(shardName);
50         findFuture.onComplete(new OnComplete<ActorRef>() {
51             @Override
52             public void onComplete(final Throwable failure, final ActorRef shard) {
53                 if (failure instanceof LocalShardNotFoundException) {
54                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
55                             + "cannot be registered", shardName, getInstance(), subtree);
56                 } else if (failure != null) {
57                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
58                             + "cannot be registered", shardName, getInstance(), subtree, failure);
59                 } else {
60                     performRegistration(shard);
61                 }
62             }
63         }, actorUtils.getClientDispatcher());
64     }
65
66     private synchronized void performRegistration(final ActorRef shard) {
67         if (isClosed()) {
68             return;
69         }
70         cohortRegistry = shard;
71         Future<Object> future =
72                 Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
73         future.onComplete(new OnComplete<>() {
74
75             @Override
76             public void onComplete(final Throwable failure, final Object val) {
77                 if (failure != null) {
78                     LOG.error("Unable to register {} as commit cohort", getInstance(), failure);
79                 }
80                 if (isClosed()) {
81                     removeRegistration();
82                 }
83             }
84
85         }, actorUtils.getClientDispatcher());
86     }
87
88     @Override
89     protected synchronized void removeRegistration() {
90         if (cohortRegistry != null) {
91             cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());
92         }
93     }
94 }