Merge "Fix to karaf-parent to copy in dependencies."
authorMoiz Raja <moraja@cisco.com>
Wed, 4 Mar 2015 22:11:49 +0000 (22:11 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 4 Mar 2015 22:11:49 +0000 (22:11 +0000)
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 3e6742c17d37c178d30c7d570490ff993c068806..d4d13899eb770e4ee8bb88a634dd85ec2cfb7bbc 100644 (file)
@@ -43,6 +43,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+    private FiniteDuration electionTimeOutInterval;
 
     // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
     // in-memory journal can use before it needs to snapshot
@@ -52,6 +53,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
+        electionTimeOutInterval = null;
     }
 
     public void setSnapshotBatchCount(long snapshotBatchCount) {
@@ -72,6 +74,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     public void setElectionTimeoutFactor(long electionTimeoutFactor){
         this.electionTimeoutFactor = electionTimeoutFactor;
+        electionTimeOutInterval = null;
     }
 
     @Override
@@ -92,7 +95,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
-        return getHeartBeatInterval().$times(electionTimeoutFactor);
+        if(electionTimeOutInterval == null) {
+            electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+        }
+
+        return electionTimeOutInterval;
     }
 
     @Override
index 04b9f163f4b6ad7be0f423e485902f7b3d5ccb79..90e128256132437c5f2acd4cb861e0e74a759d44 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
     private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
@@ -21,18 +20,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
-    private final long followerTimeoutMillis;
+    private final RaftActorContext context;
 
     private volatile long nextIndex;
 
     private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, long nextIndex,
-        long matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
-        this.nextIndex = nextIndex;
+        this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
-        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
+        this.context = context;
     }
 
     @Override
@@ -78,7 +76,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     @Override
     public boolean isFollowerActive() {
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+        return (stopwatch.isRunning()) &&
+                (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
     @Override
@@ -107,7 +106,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
                 .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
                 .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
-                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+                .append(", followerTimeoutMillis=")
+                .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
 
index cb1b42aa1b1fb95ced067fc152e0760a3ce0576f..ec3f375bdeb0fb940e4d08d14397ebd03f957853 100644 (file)
@@ -99,7 +99,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private final RaftActorContext context;
+    private final RaftActorContextImpl context;
 
     /**
      * The in-memory journal
@@ -140,6 +140,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.preStart();
     }
 
+    @Override
+    public void postStop() {
+        if(currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
+            }
+        }
+
+        super.postStop();
+    }
+
     @Override
     public void handleRecover(Object message) {
         if(persistence().isRecoveryApplicable()) {
@@ -515,6 +528,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context;
     }
 
+    protected void updateConfigParams(ConfigParams configParams) {
+        context.setConfigParams(configParams);
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
index b71b3be3522e1082e46f56f681a0ee49440ab79d..6fc5e4369bb4879e1e8dd2ca01f92b98d5ec4ba4 100644 (file)
@@ -37,7 +37,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Logger LOG;
 
-    private final ConfigParams configParams;
+    private ConfigParams configParams;
 
     private boolean snapshotCaptureInitiated;
 
@@ -59,6 +59,10 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.LOG = logger;
     }
 
+    void setConfigParams(ConfigParams configParams) {
+        this.configParams = configParams;
+    }
+
     @Override
     public ActorRef actorOf(Props props){
         return context.actorOf(props);
index be51ba069cc5056636646566d1db00b30154073a..890d45e8fb664b20a4f27e8996d38b15e27a0120 100644 (file)
@@ -97,9 +97,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
         for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    context.getCommitIndex(), -1,
-                    context.getConfigParams().getElectionTimeOutInterval());
+                new FollowerLogInformationImpl(followerId, -1, context);
 
             ftlBuilder.put(followerId, followerLogInformation);
         }
index 84d1545a65e3302462b69f9ee96f8dd982955654..5be9030f5957bd361c73ead2d11353eb5d178f95 100644 (file)
@@ -7,29 +7,29 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerLogInformationImplTest {
 
     @Test
     public void testIsFollowerActive() {
 
-        FiniteDuration timeoutDuration =
-            new FiniteDuration(500, TimeUnit.MILLISECONDS);
-
-        FollowerLogInformation followerLogInformation =
-            new FollowerLogInformationImpl(
-                "follower1", 10, 9, timeoutDuration);
+        MockRaftActorContext context = new MockRaftActorContext();
+        context.setCommitIndex(10);
 
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(1);
+        context.setConfigParams(configParams);
 
+        FollowerLogInformation followerLogInformation =
+            new FollowerLogInformationImpl("follower1", 9, context);
 
         assertFalse("Follower should be termed inactive before stopwatch starts",
             followerLogInformation.isFollowerActive());
index 297d781251cc69854363800171599dcdc2bdc1c7..c3161a592feb979b11b6032c735fb41a90ffa574 100644 (file)
@@ -37,22 +37,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
-        electionTerm = null;
-
-        initReplicatedLog();
-    }
-
-    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
-        this.id = id;
-        this.system = system;
-        this.actor = actor;
-
-        final String id1 = id;
         electionTerm = new ElectionTerm() {
-            /**
-             * Identifier of the actor whose election term information this is
-             */
-            private final String id = id1;
             private long currentTerm = 1;
             private String votedFor = "";
 
@@ -81,6 +66,13 @@ public class MockRaftActorContext implements RaftActorContext {
         };
 
         configParams = new DefaultConfigParamsImpl();
+    }
+
+    public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+        this();
+        this.id = id;
+        this.system = system;
+        this.actor = actor;
 
         initReplicatedLog();
     }
index 20ee55519b945e42ffb3e06acf53b53d42a93556..2688d0195df662f004a4d4738bf16330387b194f 100644 (file)
@@ -11,8 +11,11 @@ import java.io.IOException;
 import java.util.Dictionary;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,12 +28,19 @@ import org.slf4j.LoggerFactory;
 public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
     public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
 
+    public static interface Listener {
+        void onDatastoreContextUpdated(DatastoreContext context);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
 
     private final DatastoreContextIntrospector introspector;
     private final BundleContext bundleContext;
+    private ServiceRegistration<?> configListenerServiceRef;
+    private Listener listener;
 
-    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector,
+            BundleContext bundleContext) {
         this.introspector = introspector;
         this.bundleContext = bundleContext;
 
@@ -40,9 +50,16 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
             LOG.warn("No ConfigurationAdmin service found");
         } else {
             overlaySettings(configAdminServiceReference);
+
+            configListenerServiceRef = bundleContext.registerService(ConfigurationListener.class.getName(),
+                    new DatastoreConfigurationListener(), null);
         }
     }
 
+    public void setListener(Listener listener) {
+        this.listener = listener;
+    }
+
     private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
         try {
             ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
@@ -53,7 +70,11 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
                 LOG.debug("Overlaying settings: {}", properties);
 
-                introspector.update(properties);
+                if(introspector.update(properties)) {
+                    if(listener != null) {
+                        listener.onDatastoreContextUpdated(introspector.getContext());
+                    }
+                }
             } else {
                 LOG.debug("No Configuration found for {}", CONFIG_ID);
             }
@@ -72,5 +93,20 @@ public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
 
     @Override
     public void close() {
+        listener = null;
+
+        if(configListenerServiceRef != null) {
+            configListenerServiceRef.unregister();
+        }
+    }
+
+    private class DatastoreConfigurationListener implements ConfigurationListener {
+        @Override
+        public void configurationEvent(ConfigurationEvent event) {
+            if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
+                LOG.debug("configurationEvent: config {} was updated", CONFIG_ID);
+                overlaySettings(event.getReference());
+            }
+        }
     }
 }
index 3ca64210bea11986689765fdeee5936f2ba99bc3..0bbeefd6fa64f3c5db3caf2e79f655dc4450a7c0 100644 (file)
@@ -196,11 +196,13 @@ public class DatastoreContextIntrospector {
      * @param properties the properties to apply
      * @return true if the cached DatastoreContext was updated, false otherwise.
      */
-    public boolean update(Dictionary<String, Object> properties) {
+    public synchronized boolean update(Dictionary<String, Object> properties) {
         if(properties == null || properties.isEmpty()) {
             return false;
         }
 
+        LOG.debug("In update: properties: {}", properties);
+
         Builder builder = DatastoreContext.newBuilderFrom(context);
 
         final String dataStoreTypePrefix = context.getDataStoreType() + '.';
@@ -291,12 +293,12 @@ public class DatastoreContextIntrospector {
     }
 
     private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
-        LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+        LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
                 toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
 
         Constructor<?> ctor = constructors.get(toType);
 
-        LOG.debug("Found {}", ctor);
+        LOG.trace("Found {}", ctor);
 
         if(ctor == null) {
             throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
index 434efc9111f48e3910af12386d3cd1d4f5e7360a..51182deb1dcd6586ee1d421d477e3072363d4a83 100644 (file)
@@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
+public class DistributedDataStore implements DOMStore, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
@@ -127,6 +128,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         actorContext.setSchemaContext(schemaContext);
     }
 
+    @Override
+    public void onDatastoreContextUpdated(DatastoreContext context) {
+        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
+
+        actorContext.setDatastoreContext(context);
+        datastoreConfigMXBean.setContext(context);
+    }
+
     @Override
     public void close() {
         datastoreConfigMXBean.unregisterMBean();
index f3a55c55a720fe21e8718cc4a346c13706074f49..ee9f4f3ad5276e5586da6135902835f0896f2b6f 100644 (file)
@@ -34,6 +34,8 @@ public class DistributedDataStoreFactory {
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
                 new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
 
+        overlay.setListener(dataStore);
+
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
 
index 0672023fcbfe0e207036006d85ca9d1509a94a38..c509580ac657f920564cc2c4db1a43f91fa7d325 100644 (file)
@@ -113,9 +113,9 @@ public class Shard extends RaftActor {
     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
                                                                        Lists.newArrayList();
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
-    private final DataPersistenceProvider dataPersistenceProvider;
+    private DataPersistenceProvider dataPersistenceProvider;
 
     private SchemaContext schemaContext;
 
@@ -123,7 +123,7 @@ public class Shard extends RaftActor {
 
     private final ShardCommitCoordinator commitCoordinator;
 
-    private final long transactionCommitTimeout;
+    private long transactionCommitTimeout;
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
@@ -175,8 +175,7 @@ public class Shard extends RaftActor {
         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 
-        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+        setTransactionCommitTimeout();
 
         // create a notifier actor for each cluster member
         roleChangeNotifier = createRoleChangeNotifier(name.toString());
@@ -185,6 +184,11 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
     }
 
+    private void setTransactionCommitTimeout() {
+        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+    }
+
     private static Map<String, String> mapPeerAddresses(
         final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
@@ -216,11 +220,15 @@ public class Shard extends RaftActor {
 
     @Override
     public void postStop() {
+        LOG.info("Stopping Shard {}", persistenceId());
+
         super.postStop();
 
         if(txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
+
+        shardMBean.unregisterMBean();
     }
 
     @Override
@@ -278,6 +286,8 @@ public class Shard extends RaftActor {
                         resolved.getPeerAddress());
             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
                 handleTransactionCommitTimeoutCheck();
+            } else if(message instanceof DatastoreContext) {
+                onDatastoreContext((DatastoreContext)message);
             } else {
                 super.onReceiveCommand(message);
             }
@@ -291,6 +301,24 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+
+        commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+        setTransactionCommitTimeout();
+
+        if(datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+            dataPersistenceProvider = new PersistentDataProvider();
+        } else if(!datastoreContext.isPersistent() &&
+                dataPersistenceProvider instanceof PersistentDataProvider) {
+            dataPersistenceProvider = new NonPersistentRaftDataProvider();
+        }
+
+        updateConfigParams(datastoreContext.getShardRaftConfig());
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
index 8b95404c4e6bdd06e2f4bb9d234eeab4465dd81f..951bc22545804af11bbc37fd8ef28d03cd53fc02 100644 (file)
@@ -34,7 +34,7 @@ public class ShardCommitCoordinator {
 
     private final Queue<CohortEntry> queuedCohortEntries;
 
-    private final int queueCapacity;
+    private int queueCapacity;
 
     private final Logger log;
 
@@ -54,6 +54,10 @@ public class ShardCommitCoordinator {
         queuedCohortEntries = new LinkedList<>();
     }
 
+    public void setQueueCapacity(int queueCapacity) {
+        this.queueCapacity = queueCapacity;
+    }
+
     /**
      * This method caches a cohort entry for the given transactions ID in preparation for the
      * subsequent 3-phase commit.
index 426a2e0934f173560647a13569b22d1e06f632b2..775cae35e22843cafd223bce22eb5232a230868f 100644 (file)
@@ -39,7 +39,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -90,9 +89,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfoMBean mBean;
+    private ShardManagerInfo mBean;
 
-    private final DatastoreContext datastoreContext;
+    private DatastoreContext datastoreContext;
 
     private Collection<String> knownModules = Collections.emptySet();
 
@@ -132,6 +131,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
     }
 
+    @Override
+    public void postStop() {
+        LOG.info("Stopping ShardManager");
+
+        mBean.unregisterMBean();
+    }
+
     @Override
     public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
+        } else if(message instanceof DatastoreContext) {
+            onDatastoreContext((DatastoreContext)message);
         } else{
             unknownMessage(message);
         }
@@ -258,6 +266,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                info.getActor().tell(datastoreContext, getSelf());
+            }
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
index 26e6318f6d4d14aa5944a909696bbaa8b5f7f207..7eede29b65690db530fd4b9cfb9acb130365fcb6 100644 (file)
@@ -85,18 +85,19 @@ public class ActorContext {
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
-    private final DatastoreContext datastoreContext;
-    private final FiniteDuration operationDuration;
-    private final Timeout operationTimeout;
+    private DatastoreContext datastoreContext;
+    private FiniteDuration operationDuration;
+    private Timeout operationTimeout;
     private final String selfAddressHostPort;
-    private final RateLimiter txRateLimiter;
+    private RateLimiter txRateLimiter;
     private final MetricRegistry metricRegistry = new MetricRegistry();
     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
-    private final Timeout transactionCommitOperationTimeout;
+    private Timeout transactionCommitOperationTimeout;
     private final Dispatchers dispatchers;
 
     private volatile SchemaContext schemaContext;
+    private volatile boolean updated;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -112,14 +113,9 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
-        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
-        operationTimeout = new Timeout(operationDuration);
-        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
-                TimeUnit.SECONDS));
-
+        setCachedProperties();
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -133,6 +129,16 @@ public class ActorContext {
 
     }
 
+    private void setCachedProperties() {
+        txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+    }
+
     public DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
@@ -157,7 +163,25 @@ public class ActorContext {
         this.schemaContext = schemaContext;
 
         if(shardManager != null) {
-            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+            shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+        }
+    }
+
+    public void setDatastoreContext(DatastoreContext context) {
+        this.datastoreContext = context;
+        setCachedProperties();
+
+        // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+        // will be published immediately even though they may not be immediately visible to other
+        // threads due to unsynchronized reads. That's OK though - we're going for eventual
+        // consistency here as immediately visible updates to these members aren't critical. These
+        // members could've been made volatile but wanted to avoid volatile reads as these are
+        // accessed often and updates will be infrequent.
+
+        updated = true;
+
+        if(shardManager != null) {
+            shardManager.tell(context, ActorRef.noSender());
         }
     }
 
index 3693c01b42348f59d3b26daa3ed184138e3d2bed..ecfcdefcb28d2eeecc24e934180372178ba7678f 100644 (file)
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationEvent;
+import org.osgi.service.cm.ConfigurationListener;
 
 /**
  * Unit tests for DatastoreContextConfigAdminOverlay.
  *
  * @author Thomas Pantelis
  */
+@SuppressWarnings("unchecked")
 public class DatastoreContextConfigAdminOverlayTest {
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void test() throws IOException {
-        BundleContext mockBundleContext = mock(BundleContext.class);
-        ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
-        ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
-        Configuration mockConfig = mock(Configuration.class);
-        DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+    @Mock
+    private BundleContext mockBundleContext;
+
+    @Mock
+    private ServiceReference<ConfigurationAdmin> mockConfigAdminServiceRef;
+
+    @Mock
+    private ConfigurationAdmin mockConfigAdmin;
+
+    @Mock
+    private Configuration mockConfig;
+
+    @Mock
+    private DatastoreContextIntrospector mockIntrospector;
+
+    @Mock
+    private ServiceRegistration<?> configListenerServiceReg;
+
+    @Before
+    public void setup() throws IOException {
+        MockitoAnnotations.initMocks(this);
 
-        doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
-        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+        doReturn(mockConfigAdminServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockConfigAdminServiceRef);
+        doReturn(configListenerServiceReg).when(mockBundleContext).registerService(
+                eq(ConfigurationListener.class.getName()), any(), any(Dictionary.class));
 
         doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
 
         doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
 
+    }
+
+    @Test
+    public void testUpdateOnConstruction() {
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("property", "value");
+        doReturn(properties).when(mockConfig).getProperties();
+
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        verify(mockIntrospector).update(properties);
+
+        verify(mockBundleContext).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+    }
+
+    @Test
+    public void testUpdateOnConfigurationEvent() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        DatastoreContext context = DatastoreContext.newBuilder().build();
+        doReturn(context).when(mockIntrospector).getContext();
+
+        DatastoreContextConfigAdminOverlay.Listener mockListener =
+                mock(DatastoreContextConfigAdminOverlay.Listener.class);
+
+        overlay.setListener(mockListener);
+
         Dictionary<String, Object> properties = new Hashtable<>();
         properties.put("property", "value");
         doReturn(properties).when(mockConfig).getProperties();
 
-        try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
-                mockIntrospector, mockBundleContext)) {
-        }
+        doReturn(true).when(mockIntrospector).update(properties);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
 
         verify(mockIntrospector).update(properties);
 
-        verify(mockBundleContext).ungetService(mockServiceRef);
+        verify(mockListener).onDatastoreContextUpdated(context);
+
+        verify(mockBundleContext, times(2)).ungetService(mockConfigAdminServiceRef);
+
+        overlay.close();
+
+        verify(configListenerServiceReg).unregister();
+    }
+
+    @Test
+    public void testConfigurationEventWithDifferentPid() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn("other-pid").when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_UPDATED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
+    }
+
+    @Test
+    public void testConfigurationEventWithNonUpdateEventType() {
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext);
+
+        reset(mockIntrospector);
+
+        ArgumentCaptor<ConfigurationListener> configListener =
+                ArgumentCaptor.forClass(ConfigurationListener.class);
+        verify(mockBundleContext).registerService(eq(ConfigurationListener.class.getName()),
+                configListener.capture(), any(Dictionary.class));
+
+        ConfigurationEvent configEvent = mock(ConfigurationEvent.class);
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(configEvent).getPid();
+        doReturn(mockConfigAdminServiceRef).when(configEvent).getReference();
+        doReturn(ConfigurationEvent.CM_DELETED).when(configEvent).getType();
+
+        configListener.getValue().configurationEvent(configEvent);
+
+        verify(mockIntrospector, times(0)).update(any(Dictionary.class));
+
+        overlay.close();
     }
 }
index bbbc4db5e351f99df3856e807b146f18ac1899d1..1ebd1b91dd474bf14ce0c5d9c32a9eb8cce64d84 100644 (file)
@@ -1565,6 +1565,31 @@ public class ShardTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testOnDatastoreContext() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(true);
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            waitUntilLeader(shard);
+
+            shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", false,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
 
     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
index 3c6a0cef5c605fbb23e3c9501676f67d95805e9d..fd41c49390b484fd0d4343befa2f920d542e2f74 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Optional;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -339,4 +340,29 @@ public class ActorContextTest extends AbstractActorTest{
 
     }
 
+    @Test
+    public void testSetDatastoreContext() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+                            mock(Configuration.class), DatastoreContext.newBuilder().
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+
+            assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 7,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+            DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+                    shardTransactionCommitTimeoutInSeconds(8).build();
+
+            actorContext.setDatastoreContext(newContext);
+
+            expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+
+            Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+            assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 8,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+        }};
+    }
 }
index 9ed3d276a3b59c3d15fd3417888f42ddf70f6073..3dd752ec30f14eecd43d8934d6ecf210687d9c4a 100644 (file)
@@ -3,4 +3,4 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=trace
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
\ No newline at end of file