DistributedDataStoreFactory.destroyInstance(this);
}
- @VisibleForTesting
- ActorContext getActorContext() {
+ public ActorContext getActorContext() {
return actorContext;
}
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
- public static Props props(final ShardIdentifier name,
- final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- Preconditions.checkNotNull(name, "name should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
-
+ public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
}
leaderPayloadVersion);
}
- private void onDatastoreContext(DatastoreContext context) {
+ protected void onDatastoreContext(DatastoreContext context) {
datastoreContext = context;
commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
return commitCoordinator;
}
+ protected abstract static class AbstractShardCreator implements Creator<Shard> {
+ private static final long serialVersionUID = 1L;
- private static class ShardCreator implements Creator<Shard> {
+ protected final ShardIdentifier name;
+ protected final Map<String, String> peerAddresses;
+ protected final DatastoreContext datastoreContext;
+ protected final SchemaContext schemaContext;
- private static final long serialVersionUID = 1L;
+ protected AbstractShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+ this.name = Preconditions.checkNotNull(name, "name should not be null");
+ this.peerAddresses = Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ this.datastoreContext = Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ }
+ }
- final ShardIdentifier name;
- final Map<String, String> peerAddresses;
- final DatastoreContext datastoreContext;
- final SchemaContext schemaContext;
+ private static class ShardCreator extends AbstractShardCreator {
+ private static final long serialVersionUID = 1L;
ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- this.name = name;
- this.peerAddresses = peerAddresses;
- this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
+ super(name, peerAddresses, datastoreContext, schemaContext);
}
@Override
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* The distributed implementation of the EntityOwnershipService.
*/
public class DistributedEntityOwnershipService implements EntityOwnershipService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedEntityOwnershipService.class);
+ static final String ENTITY_OWNERSHIP_SHARD_NAME = "entity-ownership";
+ private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
private final DistributedDataStore datastore;
}
public void start() {
- LOG.info("DistributedEntityOwnershipService starting");
+ ActorRef shardManagerActor = datastore.getActorContext().getShardManager();
+
+ CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
+ datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
+ new EntityOwnershipShardPropsCreator(), null);
+
+ Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
+ createShard, MESSAGE_TIMEOUT);
+
+ createFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure != null) {
+ LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
+ } else {
+ LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
+ }
+ }
+ }, datastore.getActorContext().getClientDispatcher());
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Special Shard for EntityOwnership.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipShard extends Shard {
+
+ private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
+ return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
+ }
+
+ protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
+ }
+
+ @Override
+ protected void onDatastoreContext(DatastoreContext context) {
+ super.onDatastoreContext(noPersistenceDatastoreContext(context));
+ }
+
+ public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+ return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext));
+ }
+
+ private static class Creator extends AbstractShardCreator {
+ private static final long serialVersionUID = 1L;
+
+ Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ }
+
+ @Override
+ public Shard create() throws Exception {
+ return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.ShardPropsCreator;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of ShardPropsCreator that creates a Props instance for the EntityOwnershipShard class.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
+
+ @Override
+ public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Unit tests for DistributedEntityOwnershipService.
+ *
+ * @author Thomas Pantelis
+ */
+public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
+ private static int ID_COUNTER = 1;
+
+ private final String dataStoreType = "config" + ID_COUNTER++;
+ private DistributedEntityOwnershipService service;
+ private DistributedDataStore dataStore;
+
+ @Before
+ public void setUp() throws Exception {
+ DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
+ shardInitializationTimeout(10, TimeUnit.SECONDS).build();
+ dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
+ new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
+
+ dataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+ service = new DistributedEntityOwnershipService(dataStore);
+ }
+
+ @Test
+ public void testEntityOwnershipShardCreated() throws Exception {
+ service.start();
+
+ Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
+ DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
+ ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+ assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
+ }
+
+ @Test
+ public void testRegisterCandidate() {
+ }
+
+ @Test
+ public void testRegisterListener() {
+ }
+}