- Shard can work with both Persistent and NonPersistent messages which change the state of the InMemoryDataStore
- OSGi related changes so that the DistributedDataStore can load properly in the controller
- Other changes so that there are no startup errors (like creating a single actor system)
Change-Id: Ic9b211e2007002fa800b980e8c94463241e36b59
Signed-off-by: Moiz Raja <moraja@cisco.com>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
<Export-package></Export-package>
<Private-Package></Private-Package>
+ <Import-Package>!*snappy;!org.jboss.*;*</Import-Package>
+ <Embed-Dependency>
+ !sal*;
+ !*config-api*;
+ !*testkit*;
+ akka*;
+ *leveldb*;
+ *config*;
+ *hawt*;
+ *protobuf*;
+ *netty*;
+ *uncommons*;
+ *scala*;
+ </Embed-Dependency>
+ <Embed-Transitive>true</Embed-Transitive>
</instructions>
</configuration>
</plugin>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.ConfigFactory;
+
+public class ActorSystemFactory {
+ private static final ActorSystem actorSystem =
+ ActorSystem.create("opendaylight-cluster", ConfigFactory
+ .load().getConfig("ODLCluster"));
+
+ public static final ActorSystem getInstance(){
+ return actorSystem;
+ }
+}
/**
*
*/
-public class DistributedDataStore implements DOMStore, SchemaContextListener {
+public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
actorContext.getShardManager().tell(
new UpdateSchemaContext(schemaContext), null);
}
+
+ @Override public void close() throws Exception {
+ actorContext.shutdown();
+
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+
+public class DistributedDataStoreFactory {
+ public static DistributedDataStore createInstance(String name, SchemaService schemaService){
+ final DistributedDataStore dataStore =
+ new DistributedDataStore(ActorSystemFactory.getInstance(), name);
+ schemaService
+ .registerSchemaServiceListener(dataStore);
+ return dataStore;
+
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.NonPersistent;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
private final LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
+ // By default persistent will be true and can be turned off using the system
+ // property persistent
+ private final boolean persistent;
+
private Shard(String name) {
- log.info("Creating shard : {}", name );
+
+ String setting = System.getProperty("shard.persistent");
+ this.persistent = !"false".equals(setting);
+
+ log.info("Creating shard : {} persistent : {}", name , persistent);
store = new InMemoryDOMDataStore(name, storeExecutor);
}
});
}
+
@Override
public void onReceive(Object message) throws Exception {
log.debug("Received message {}", message);
} else if (message instanceof ForwardedCommitTransaction) {
handleForwardedCommit((ForwardedCommitTransaction) message);
} else if (message instanceof Persistent) {
- commit((Persistent) message);
+ commit((Modification) ((Persistent) message).payload());
} else if (message instanceof CreateTransaction) {
createTransaction();
+ } else if(message instanceof NonPersistent){
+ commit((Modification) ((NonPersistent) message).payload());
}
}
getSelf());
}
- private void commit(Persistent message) {
- Modification modification = (Modification) message.payload();
+ private void commit(Modification modification) {
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(modification);
if (cohort == null) {
log.info("received forwarded transaction");
modificationToCohort
.put(message.getModification(), message.getCohort());
- getSelf().forward(Persistent.create(message.getModification()),
- getContext());
+ if(persistent) {
+ getSelf().forward(Persistent.create(message.getModification()),
+ getContext());
+ } else {
+ getSelf().forward(NonPersistent.create(message.getModification()),
+ getContext());
+ }
}
private void updateSchemaContext(UpdateSchemaContext message) {
});
}
+
@Override
public void onReceive(Object message) throws Exception {
log.debug("Received message {}", message);
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * A NonPersistent message is to be used when we want to trigger state update
+ * for an actor without actually persisting the data to disk. This could be
+ * useful for test purposes.
+ */
+public class NonPersistent {
+ private final Object payload;
+
+ public NonPersistent(Object payload){
+ this.payload = payload;
+ }
+
+ public Object payload() {
+ return payload;
+ }
+
+ public static NonPersistent create(Object payload){
+ return new NonPersistent(payload);
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
import akka.util.Timeout;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
return executeRemoteOperation(primary, message, duration);
}
+ public void shutdown() {
+ shardManager.tell(PoisonPill.getInstance(), null);
+ actorSystem.shutdown();
+ }
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-public class DistributedConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
- public DistributedConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+public class DistributedConfigDataStoreProviderModule extends
+ org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
+ public DistributedConfigDataStoreProviderModule(
+ org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public DistributedConfigDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedConfigDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public DistributedConfigDataStoreProviderModule(
+ org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedConfigDataStoreProviderModule oldModule,
+ java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
public java.lang.AutoCloseable createInstance() {
- final ActorSystem actorSystem = ActorSystem.create("opendaylight-cluster-system", ConfigFactory
- .load().getConfig("ODLCluster"));
-
-
- final DistributedDataStore configDatastore = new DistributedDataStore(actorSystem, "config");
- getSchemaServiceDependency().registerSchemaServiceListener(configDatastore);
-
- final class AutoCloseableDistributedDataStore implements AutoCloseable {
-
- @Override
- public void close() throws Exception {
- actorSystem.shutdown();
- }
- }
-
- return new AutoCloseableDistributedDataStore();
+ return DistributedDataStoreFactory
+ .createInstance("config", getSchemaServiceDependency());
}
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-public class DistributedOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
- public DistributedOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+public class DistributedOperationalDataStoreProviderModule extends
+ org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
+ public DistributedOperationalDataStoreProviderModule(
+ org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public DistributedOperationalDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedOperationalDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public DistributedOperationalDataStoreProviderModule(
+ org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedOperationalDataStoreProviderModule oldModule,
+ java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
// add custom validation form module attributes here.
}
- @Override
- public java.lang.AutoCloseable createInstance() {
- final ActorSystem actorSystem = ActorSystem.create("opendaylight-cluster", ConfigFactory
- .load().getConfig("ODLCluster"));
- final DistributedDataStore operationalStore = new DistributedDataStore(actorSystem, "operational");
- getSchemaServiceDependency().registerSchemaServiceListener(operationalStore);
-
- final class AutoCloseableDistributedDataStore implements AutoCloseable {
-
- @Override
- public void close() throws Exception {
- actorSystem.shutdown();
- }
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ return DistributedDataStoreFactory
+ .createInstance("operational", getSchemaServiceDependency());
}
- return new AutoCloseableDistributedDataStore();
- }
-
}
import org.junit.BeforeClass;
public abstract class AbstractActorTest {
- private static ActorSystem system;
+ private static ActorSystem system;
- @BeforeClass
- public static void setUpClass(){
- system = ActorSystem.create("test");
- }
+ @BeforeClass
+ public static void setUpClass() {
+ System.setProperty("shard.persistent", "false");
+ system = ActorSystem.create("test");
+ }
- @AfterClass
- public static void tearDownClass(){
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
+ @AfterClass
+ public static void tearDownClass() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
- protected ActorSystem getSystem(){
- return system;
- }
+ protected ActorSystem getSystem() {
+ return system;
+ }
}