Bug 7407 - CDS: allow applications to request Leader movement 43/53543/27
authorJakub Morvay <jmorvay@cisco.com>
Mon, 6 Mar 2017 12:54:31 +0000 (13:54 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 1 Apr 2017 08:36:45 +0000 (08:36 +0000)
This patch provides the routing from cds-dom-api CDSShardAccess
to the backend RaftActor.

Change-Id: I9fa315034d95a1896393a6152147a7bc50829b2a
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java [new file with mode: 0644]

index ce4cfcd9085cd1ce3b7983df695249dcdaa8341d..9d971d50364996dcf8f718b0f7d8679d8b409189 100644 (file)
@@ -14,4 +14,8 @@ public class LeadershipTransferFailedException extends Exception {
     public LeadershipTransferFailedException(final String message) {
         super(message);
     }
+
+    public LeadershipTransferFailedException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java
new file mode 100644 (file)
index 0000000..bc90716
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+
+
+/**
+ * Default {@link CDSShardAccess} implementation. Listens on leader location
+ * change events and distributes them to registered listeners. Also updates
+ * current information about leader location accordingly.
+ *
+ * <p>
+ * Sends {@link MakeLeaderLocal} message to local shards and translates its result
+ * on behalf users {@link #makeLeaderLocal()} calls.
+ *
+ * <p>
+ * {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that
+ * creates instances of this class has to call {@link #close()} once it is no
+ * longer valid.
+ */
+final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class);
+
+    private final Collection<LeaderLocationListener> listeners = ConcurrentHashMap.newKeySet();
+    private final DOMDataTreeIdentifier prefix;
+    private final ActorContext actorContext;
+    private final Timeout makeLeaderLocalTimeout;
+
+    private ActorRef roleChangeListenerActor;
+
+    private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN;
+    private volatile boolean closed = false;
+
+    CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) {
+        this.prefix = Preconditions.checkNotNull(prefix);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.makeLeaderLocalTimeout =
+                new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2));
+
+        // register RoleChangeListenerActor
+        // TODO Maybe we should do this in async
+        final Optional<ActorRef> localShardReply =
+                actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+        Preconditions.checkState(localShardReply.isPresent(),
+                "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix);
+        roleChangeListenerActor =
+                actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this));
+    }
+
+    private void checkNotClosed() {
+        Preconditions.checkState(!closed,
+                "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
+    }
+
+    @Override
+    @Nonnull
+    public DOMDataTreeIdentifier getShardIdentifier() {
+        checkNotClosed();
+        return prefix;
+    }
+
+    @Override
+    @Nonnull
+    public LeaderLocation getLeaderLocation() {
+        checkNotClosed();
+        // TODO before getting first notification from roleChangeListenerActor
+        // we will always return UNKNOWN
+        return currentLeader;
+    }
+
+    @Override
+    @Nonnull
+    public CompletionStage<Void> makeLeaderLocal() {
+        // TODO when we have running make leader local operation
+        // we should just return the same completion stage
+        checkNotClosed();
+
+        // TODO can we cache local shard actorRef?
+        final Future<ActorRef> localShardReply =
+                actorContext.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+        // we have to tell local shard to make leader local
+        final scala.concurrent.Promise<Object> makeLeaderLocalAsk = Futures.promise();
+        localShardReply.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.",
+                            getShardIdentifier(), failure);
+                    makeLeaderLocalAsk.failure(failure);
+                } else if (failure != null) {
+                    // TODO should this be WARN?
+                    LOG.debug("Failed to find local shard for {} - Cannot request leadership transfer to local shard.",
+                            getShardIdentifier(), failure);
+                    makeLeaderLocalAsk.failure(failure);
+                } else {
+                    makeLeaderLocalAsk
+                            .completeWith(actorContext
+                                    .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+
+        // we have to transform make leader local request result
+        Future<Void> makeLeaderLocalFuture = makeLeaderLocalAsk.future()
+                .transform(new Mapper<Object, Void>() {
+                    @Override
+                    public Void apply(final Object parameter) {
+                        return null;
+                    }
+                }, new Mapper<Throwable, Throwable>() {
+                    @Override
+                    public Throwable apply(final Throwable parameter) {
+                        if (parameter instanceof LeadershipTransferFailedException) {
+                            // do nothing with exception and just pass it as it is
+                            return parameter;
+                        }
+                        // wrap exception in LeadershipTransferFailedEx
+                        return new LeadershipTransferFailedException("Leadership transfer failed", parameter);
+                    }
+                }, actorContext.getClientDispatcher());
+
+        return FutureConverters.toJava(makeLeaderLocalFuture);
+    }
+
+    @Override
+    @Nonnull
+    public <L extends LeaderLocationListener> LeaderLocationListenerRegistration<L>
+            registerLeaderLocationListener(@Nonnull final L listener) {
+        checkNotClosed();
+        Preconditions.checkNotNull(listener);
+        Preconditions.checkArgument(!listeners.contains(listener),
+                "Listener {} is already registered with ShardAccess {}", listener, this);
+
+        LOG.debug("Registering LeaderLocationListener {}", listener);
+
+        listeners.add(listener);
+
+        return new LeaderLocationListenerRegistration<L>() {
+            @Override
+            public L getInstance() {
+                return listener;
+            }
+
+            @Override
+            public void close() {
+                listeners.remove(listener);
+            }
+        };
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void onLeaderLocationChanged(@Nonnull final LeaderLocation location) {
+        if (closed) {
+            // we are closed already. Do not dispatch any new leader location
+            // change events.
+            return;
+        }
+
+        LOG.debug("Received leader location change notification. New leader location: {}", location);
+        currentLeader = location;
+        listeners.forEach(listener -> {
+            try {
+                listener.onLeaderLocationChanged(location);
+            } catch (Exception e) {
+                LOG.warn("Ignoring uncaught exception thrown be LeaderLocationListener {} "
+                        + "during processing leader location change {}", listener, location, e);
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        // TODO should we also remove all listeners?
+        LOG.debug("Closing {} ShardAccess", prefix);
+        closed = true;
+
+        if (roleChangeListenerActor != null) {
+            // stop RoleChangeListenerActor
+            roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender());
+            roleChangeListenerActor = null;
+        }
+    }
+}
index 9726a986c15074f9838cbb8f38fa84f8860dcbe4..df49e33cdd09d74f295c6922af03974081edb5c5 100644 (file)
@@ -32,7 +32,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
@@ -54,6 +56,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
@@ -611,13 +615,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             final Future<Void> closeFuture = ask.transform(
                     new Mapper<Object, Void>() {
                         @Override
-                        public Void apply(Object parameter) {
+                        public Void apply(final Object parameter) {
                             return null;
                         }
                     },
                     new Mapper<Throwable, Throwable>() {
                         @Override
-                        public Throwable apply(Throwable throwable) {
+                        public Throwable apply(final Throwable throwable) {
                             return throwable;
                         }
                     }, actorSystem.dispatcher());
@@ -626,12 +630,16 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         }
     }
 
-    private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+    // TODO what about producers created by this producer?
+    // They should also be CDSProducers
+    private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
 
         private final DOMDataTreeProducer delegate;
         private final Collection<DOMDataTreeIdentifier> subtrees;
         private final ActorRef shardDataTreeActor;
         private final ActorContext actorContext;
+        @GuardedBy("shardAccessMap")
+        private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
 
         ProxyProducer(final DOMDataTreeProducer delegate,
                       final Collection<DOMDataTreeIdentifier> subtrees,
@@ -658,8 +666,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         }
 
         @Override
+        @SuppressWarnings("checkstyle:IllegalCatch")
         public void close() throws DOMDataTreeProducerException {
             delegate.close();
+            synchronized (shardAccessMap) {
+                shardAccessMap.values().forEach(CDSShardAccessImpl::close);
+            }
 
             final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
             if (o instanceof DOMDataTreeProducerException) {
@@ -673,5 +685,24 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         protected DOMDataTreeProducer delegate() {
             return delegate;
         }
+
+        @Nonnull
+        @Override
+        public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
+            synchronized (shardAccessMap) {
+                Preconditions.checkArgument(subtrees.contains(subtree),
+                        "Subtree {} is not controlled by this producer {}", subtree, this);
+                if (shardAccessMap.get(subtree) != null) {
+                    return shardAccessMap.get(subtree);
+                }
+
+                // TODO Maybe we can have static factory method and return the same instance
+                // for same subtrees. But maybe it is not needed since there can be only one
+                // producer attached to some subtree at a time. And also how we can close ShardAccess
+                // then
+                final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(subtree, actorContext);
+                return shardAccessMap.put(subtree, shardAccess);
+            }
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java
new file mode 100644 (file)
index 0000000..38b69e7
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+
+/**
+ * Proxy actor which acts as a facade for user-provided
+ * {@link LeaderLocationListener}. It subscribes for {@link LeaderStateChanged}
+ * notifications in its pre start hook and translates them to
+ * {@link LeaderLocationListener#onLeaderLocationChanged(LeaderLocation)}
+ * events.
+ */
+public class RoleChangeListenerActor extends AbstractUntypedActor {
+    private final LeaderLocationListener leaderLocationListener;
+    private final ActorRef roleChangeNotifier;
+
+    private RoleChangeListenerActor(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
+        this.roleChangeNotifier = Preconditions.checkNotNull(roleChangeNotifier);
+        this.leaderLocationListener = Preconditions.checkNotNull(listener);
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
+        roleChangeNotifier.tell(new RegisterRoleChangeListener(), getSelf());
+    }
+
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof RoleChangeNotification) {
+            ignoreMessage(message);
+        } else if (message instanceof LeaderStateChanged) {
+            onLeaderStateChanged((LeaderStateChanged) message);
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void onLeaderStateChanged(final LeaderStateChanged message) {
+        final LeaderLocation newLocation;
+        if (message.getLeaderId() == null) {
+            newLocation = LeaderLocation.UNKNOWN;
+        } else if (message.getMemberId().equals(message.getLeaderId())) {
+            newLocation = LeaderLocation.LOCAL;
+        } else {
+            newLocation = LeaderLocation.REMOTE;
+        }
+
+        // TODO should we wrap this in try catch block?
+        leaderLocationListener.onLeaderLocationChanged(newLocation);
+    }
+
+    public static Props props(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
+        return Props.create(RoleChangeListenerActor.class, roleChangeNotifier, listener);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java
new file mode 100644 (file)
index 0000000..cbb0726
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class CDSShardAccessImplTest extends AbstractActorTest {
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private CDSShardAccessImpl shardAccess;
+    private ActorContext context;
+
+    @Before
+    public void setUp() {
+        context = mock(ActorContext.class);
+        final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+        doReturn(Optional.of(getSystem().deadLetters())).when(context).findLocalShard(any());
+        doReturn(datastoreContext).when(context).getDatastoreContext();
+        doReturn(getSystem()).when(context).getActorSystem();
+        shardAccess = new CDSShardAccessImpl(TEST_ID, context);
+    }
+
+    @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void testRegisterLeaderLocationListener() {
+        final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
+
+        // first registration should be OK
+        shardAccess.registerLeaderLocationListener(listener1);
+
+        // second registration should fail with IllegalArgumentEx
+        try {
+            shardAccess.registerLeaderLocationListener(listener1);
+            fail("Should throw exception");
+        } catch (final Exception e) {
+            assertTrue(e instanceof IllegalArgumentException);
+        }
+
+        // null listener registration should fail with NPE
+        try {
+            shardAccess.registerLeaderLocationListener(null);
+            fail("Should throw exception");
+        } catch (final Exception e) {
+            assertTrue(e instanceof NullPointerException);
+        }
+
+        // registering listener on closed shard access should fail with IllegalStateEx
+        final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
+        shardAccess.close();
+        try {
+            shardAccess.registerLeaderLocationListener(listener2);
+            fail("Should throw exception");
+        } catch (final Exception ex) {
+            assertTrue(ex instanceof IllegalStateException);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void testOnLeaderLocationChanged() {
+        final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
+        doThrow(new RuntimeException("Failed")).when(listener1).onLeaderLocationChanged(any());
+        final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
+        doNothing().when(listener2).onLeaderLocationChanged(any());
+        final LeaderLocationListener listener3 = mock(LeaderLocationListener.class);
+        doNothing().when(listener3).onLeaderLocationChanged(any());
+
+        final LeaderLocationListenerRegistration reg1 = shardAccess.registerLeaderLocationListener(listener1);
+        final LeaderLocationListenerRegistration reg2 = shardAccess.registerLeaderLocationListener(listener2);
+        final LeaderLocationListenerRegistration reg3 = shardAccess.registerLeaderLocationListener(listener3);
+
+        // Error in listener1 should not affect dispatching change to other listeners
+        shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
+        verify(listener1).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+        verify(listener2).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+        verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+
+        // Closed listeners shouldn't see new leader location changes
+        reg1.close();
+        reg2.close();
+        shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
+        verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
+        verifyNoMoreInteractions(listener1);
+        verifyNoMoreInteractions(listener2);
+
+        // Closed shard access should not dispatch any new events
+        shardAccess.close();
+        shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
+        verifyNoMoreInteractions(listener1);
+        verifyNoMoreInteractions(listener2);
+        verifyNoMoreInteractions(listener3);
+
+        reg3.close();
+    }
+
+    @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void testGetShardIdentifier() {
+        assertEquals(shardAccess.getShardIdentifier(), TEST_ID);
+
+        // closed shard access should throw illegal state
+        shardAccess.close();
+        try {
+            shardAccess.getShardIdentifier();
+            fail("Exception expected");
+        } catch (final Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void testGetLeaderLocation() {
+        // new shard access does not know anything about leader location
+        assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
+
+        // we start getting leader location changes notifications
+        shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
+        assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.LOCAL);
+
+        shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
+        shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
+        assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
+
+        // closed shard access throws illegal state
+        shardAccess.close();
+        try {
+            shardAccess.getLeaderLocation();
+            fail("Should have failed with IllegalStateEx");
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+        }
+    }
+
+    @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void testMakeLeaderLocal() throws Exception {
+        final FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+        final ActorRef localShardRef = mock(ActorRef.class);
+        final Future<ActorRef> localShardRefFuture = Futures.successful(localShardRef);
+        doReturn(localShardRefFuture).when(context).findLocalShardAsync(any());
+
+        // MakeLeaderLocal will reply with success
+        doReturn(Futures.successful(null)).when(context).executeOperationAsync((ActorRef) any(), any(), any());
+        doReturn(getSystem().dispatcher()).when(context).getClientDispatcher();
+        assertEquals(waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout), null);
+
+        // MakeLeaderLocal will reply with failure
+        doReturn(Futures.failed(new LeadershipTransferFailedException("Failure")))
+                .when(context).executeOperationAsync((ActorRef) any(), any(), any());
+
+        try {
+            waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
+            fail("makeLeaderLocal operation should not be successful");
+        } catch (final Exception e) {
+            assertTrue(e instanceof LeadershipTransferFailedException);
+        }
+
+        // we don't even find local shard
+        doReturn(Futures.failed(new LocalShardNotFoundException("Local shard not found")))
+                .when(context).findLocalShardAsync(any());
+
+        try {
+            waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
+            fail("makeLeaderLocal operation should not be successful");
+        } catch (final Exception e) {
+            assertTrue(e instanceof LeadershipTransferFailedException);
+            assertTrue(e.getCause() instanceof LocalShardNotFoundException);
+        }
+
+        // closed shard access should throw IllegalStateEx
+        shardAccess.close();
+        try {
+            shardAccess.makeLeaderLocal();
+            fail("Should have thrown IllegalStateEx. ShardAccess is closed");
+        } catch (final Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java
new file mode 100644 (file)
index 0000000..978cc27
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+
+public class RoleChangeListenerActorTest extends AbstractActorTest {
+
+    @Test
+    public void testRegisterRoleChangeListenerOnStart() {
+        new JavaTestKit(getSystem()) {
+            {
+                final LeaderLocationListener listener = mock(LeaderLocationListener.class);
+                final Props props = RoleChangeListenerActor.props(getRef(), listener);
+
+                getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart");
+                expectMsgClass(RegisterRoleChangeListener.class);
+            }
+        };
+    }
+
+    @Test
+    public void testOnDataTreeChanged() {
+        final LeaderLocationListener listener = mock(LeaderLocationListener.class);
+        doNothing().when(listener).onLeaderLocationChanged(any());
+        final Props props = RoleChangeListenerActor.props(getSystem().deadLetters(), listener);
+
+        final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedChanged");
+
+        subject.tell(new LeaderStateChanged("member-1", null, (short) 0), noSender());
+        verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.UNKNOWN));
+
+        subject.tell(new LeaderStateChanged("member-1", "member-1", (short) 0), noSender());
+        verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+
+        subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender());
+        verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
+
+    }
+}
\ No newline at end of file