BUG-5280: remove support for talking to pre-Boron clients
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortRegistrationProxy.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.base.Preconditions;
16 import java.util.concurrent.TimeUnit;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
19 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
23 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.FiniteDuration;
28
29 public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C>
30         implements DOMDataTreeCommitCohortRegistration<C> {
31
32     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
33     private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
34     private final DOMDataTreeIdentifier subtree;
35     private final ActorRef actor;
36     private final ActorContext actorContext;
37     @GuardedBy("this")
38     private ActorRef cohortRegistry;
39
40
41     DataTreeCohortRegistrationProxy(ActorContext actorContext, DOMDataTreeIdentifier subtree, C cohort) {
42         super(cohort);
43         this.subtree = Preconditions.checkNotNull(subtree);
44         this.actorContext = Preconditions.checkNotNull(actorContext);
45         this.actor = actorContext.getActorSystem().actorOf(
46                 DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
47     }
48
49
50     public void init(String shardName) {
51         // FIXME: Add late binding to shard.
52         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
53         findFuture.onComplete(new OnComplete<ActorRef>() {
54             @Override
55             public void onComplete(final Throwable failure, final ActorRef shard) {
56                 if (failure instanceof LocalShardNotFoundException) {
57                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
58                             + "cannot be registered", shardName, getInstance(), subtree);
59                 } else if (failure != null) {
60                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
61                             + "cannot be registered: {}", shardName, getInstance(), subtree, failure);
62                 } else {
63                     performRegistration(shard);
64                 }
65             }
66         }, actorContext.getClientDispatcher());
67     }
68
69     private synchronized void performRegistration(ActorRef shard) {
70         if (isClosed()) {
71             return;
72         }
73         cohortRegistry = shard;
74         Future<Object> future =
75                 Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
76         future.onComplete(new OnComplete<Object>() {
77
78             @Override
79             public void onComplete(Throwable e, Object val) throws Throwable {
80                 if (e != null) {
81                     LOG.error("Unable to register {} as commit cohort", getInstance(), e);
82                 }
83                 if (isClosed()) {
84                     removeRegistration();
85                 }
86             }
87
88         }, actorContext.getClientDispatcher());
89     }
90
91     @Override
92     protected synchronized void removeRegistration() {
93         if (cohortRegistry != null) {
94             cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());
95         }
96     }
97 }