2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import akka.util.Timeout;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.concurrent.TimeUnit;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
24 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.Future;
28 import scala.concurrent.duration.FiniteDuration;
30 public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort> extends AbstractObjectRegistration<C>
31 implements DOMDataTreeCommitCohortRegistration<C> {
33 private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortRegistrationProxy.class);
34 private static final Timeout TIMEOUT = new Timeout(new FiniteDuration(5, TimeUnit.SECONDS));
35 private final DOMDataTreeIdentifier subtree;
36 private final ActorRef actor;
37 private final ActorUtils actorUtils;
39 private ActorRef cohortRegistry;
41 DataTreeCohortRegistrationProxy(final ActorUtils actorUtils, final DOMDataTreeIdentifier subtree,
44 this.subtree = requireNonNull(subtree);
45 this.actorUtils = requireNonNull(actorUtils);
46 this.actor = actorUtils.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
47 subtree.getRootIdentifier()).withDispatcher(actorUtils.getNotificationDispatcherPath()));
50 public void init(final String shardName) {
51 // FIXME: Add late binding to shard.
52 Future<ActorRef> findFuture = actorUtils.findLocalShardAsync(shardName);
53 findFuture.onComplete(new OnComplete<ActorRef>() {
55 public void onComplete(final Throwable failure, final ActorRef shard) {
56 if (failure instanceof LocalShardNotFoundException) {
57 LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
58 + "cannot be registered", shardName, getInstance(), subtree);
59 } else if (failure != null) {
60 LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
61 + "cannot be registered", shardName, getInstance(), subtree, failure);
63 performRegistration(shard);
66 }, actorUtils.getClientDispatcher());
69 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
70 justification = "https://github.com/spotbugs/spotbugs/issues/811")
71 private synchronized void performRegistration(final ActorRef shard) {
75 cohortRegistry = shard;
76 Future<Object> future =
77 Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT);
78 future.onComplete(new OnComplete<Object>() {
81 public void onComplete(final Throwable failure, final Object val) {
82 if (failure != null) {
83 LOG.error("Unable to register {} as commit cohort", getInstance(), failure);
90 }, actorUtils.getClientDispatcher());
94 protected synchronized void removeRegistration() {
95 if (cohortRegistry != null) {
96 cohortRegistry.tell(new DataTreeCohortActorRegistry.RemoveCohort(actor), ActorRef.noSender());