lastLogEntryIndex = lastLogEntry.getIndex();
lastLogEntryTerm = lastLogEntry.getTerm();
} else {
- LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+ LOG.debug("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
lastAppliedIndex, lastAppliedTerm);
}
<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Bundle-Activator>org.opendaylight.controller.cluster.datastore.osgi.Activator</Bundle-Activator>
<Export-Package></Export-Package>
<Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
<!--
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class looks for a previously saved data store backup file in a directory and, if found, de-serializes
+ * the DatastoreSnapshot instances. This class has a static singleton that is created on bundle activation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestore {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreSnapshotRestore.class);
+
+ private static AtomicReference<DatastoreSnapshotRestore> instance = new AtomicReference<>();
+
+ private final String restoreDirectoryPath;
+ private final Map<String, DatastoreSnapshot> datastoreSnapshots = new ConcurrentHashMap<>();
+ private final AtomicBoolean initialized = new AtomicBoolean();
+
+ public static void createInstance(String restoreDirectoryPath) {
+ instance.compareAndSet(null, new DatastoreSnapshotRestore(restoreDirectoryPath));
+ }
+
+ public static void removeInstance() {
+ instance.set(null);
+ }
+
+ public static DatastoreSnapshotRestore instance() {
+ DatastoreSnapshotRestore localInstance = instance.get();
+ return Preconditions.checkNotNull(localInstance, "DatastoreSnapshotRestore instance was not created");
+ }
+
+ private DatastoreSnapshotRestore(String restoreDirectoryPath) {
+ this.restoreDirectoryPath = Preconditions.checkNotNull(restoreDirectoryPath);
+ }
+
+ private void initialize() {
+ if(!initialized.compareAndSet(false, true)) {
+ return;
+ }
+
+ File restoreDirectoryFile = new File(restoreDirectoryPath);
+
+ String[] files = restoreDirectoryFile.list();
+ if(files == null || files.length == 0) {
+ LOG.debug("Restore directory {} does not exist or is empty", restoreDirectoryFile);
+ return;
+ }
+
+ if(files.length > 1) {
+ LOG.error("Found {} files in clustered datastore restore directory {} - expected 1. No restore will be attempted",
+ files.length, restoreDirectoryFile);
+ return;
+ }
+
+ File restoreFile = new File(restoreDirectoryFile, files[0]);
+
+ LOG.info("Clustered datastore will be restored from file {}", restoreFile);
+
+ try(FileInputStream fis = new FileInputStream(restoreFile)) {
+ DatastoreSnapshotList snapshots = deserialize(fis);
+ LOG.debug("Deserialized {} snapshots", snapshots.size());
+
+ for(DatastoreSnapshot snapshot: snapshots) {
+ datastoreSnapshots.put(snapshot.getType(), snapshot);
+ }
+ } catch (Exception e) {
+ LOG.error("Error reading clustered datastore restore file {}", restoreFile, e);
+ } finally {
+ if(!restoreFile.delete()) {
+ LOG.error("Could not delete clustered datastore restore file {}", restoreFile);
+ }
+ }
+ }
+
+ private DatastoreSnapshotList deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
+ try(ObjectInputStream ois = new ObjectInputStream(inputStream)) {
+ return (DatastoreSnapshotList) ois.readObject();
+ }
+ }
+
+ public DatastoreSnapshot getAndRemove(String datastoreType) {
+ initialize();
+ return datastoreSnapshots.remove(datastoreType);
+ }
+}
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
private final ActorContext actorContext;
private final long waitTillReadyTimeInMillis;
-
private AutoCloseable closeable;
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
private final TransactionContextFactory txContextFactory;
public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
- Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
+ Configuration configuration, DatastoreContextFactory datastoreContextFactory,
+ DatastoreSnapshot restoreFromSnapshot) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
- actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
- datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
- configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+
+ ShardManager.Builder builder = ShardManager.builder().cluster(cluster).configuration(configuration).
+ datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
+ primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+
+ actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, builder, shardDispatcher,
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
}
- private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
- DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
- String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
+ private ActorRef createShardManager(ActorSystem actorSystem, ShardManager.Builder builder, String shardDispatcher,
+ String shardManagerId){
Exception lastException = null;
for(int i=0;i<100;i++) {
try {
- return actorSystem.actorOf(
- ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
- primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
- ActorContext.MAILBOX), shardManagerId);
+ return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
+ ActorContext.MAILBOX), shardManagerId);
} catch (Exception e){
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
public static DistributedDataStore createInstance(SchemaService schemaService,
- DatastoreContext datastoreContext, ActorSystem actorSystem, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, DatastoreSnapshot restoreFromSnapshot, ActorSystem actorSystem,
+ BundleContext bundleContext) {
LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreType());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
- new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory());
+ new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot);
overlay.setListener(dataStore);
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
+ private ShardSnapshot restoreFromSnapshot;
+
protected Shard(AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
this.datastoreContext = builder.getDatastoreContext();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
setPersistence(datastoreContext.isPersistent());
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+ restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
+ restoreFromSnapshot = null;
+
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
private SchemaContext schemaContext;
+ private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private volatile boolean sealed;
protected AbstractBuilder(Class<S> shardClass) {
return self();
}
+ public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
+ }
+
public ShardIdentifier getId() {
return id;
}
return schemaContext;
}
+ public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+
protected void verify() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
import akka.dispatch.OnComplete;
-import akka.japi.Creator;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
private SchemaContext schemaContext;
+ private DatastoreSnapshot restoreFromSnapshot;
+
/**
*/
- protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
- PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContextFactory = datastoreContextFactory;
- this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+ protected ShardManager(Builder builder) {
+
+ this.cluster = builder.cluster;
+ this.configuration = builder.configuration;
+ this.datastoreContextFactory = builder.datastoreContextFactory;
+ this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = primaryShardInfoCache;
+ this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = builder.primaryShardInfoCache;
+ this.restoreFromSnapshot = builder.restoreFromSnapshot;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
createLocalShards();
}
- public static Props props(
- final ClusterWrapper cluster,
- final Configuration configuration,
- final DatastoreContextFactory datastoreContextFactory,
- final CountDownLatch waitTillReadyCountdownLatch,
- final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
- waitTillReadyCountdownLatch, primaryShardInfoCache));
- }
-
@Override
public void postStop() {
LOG.info("Stopping ShardManager");
String memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+ if(restoreFromSnapshot != null)
+ {
+ for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ shardSnapshots.put(snapshot.getName(), snapshot);
+ }
+ }
+
+ restoreFromSnapshot = null; // null out to GC
+
List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
- newShardDatastoreContext(shardName), Shard.builder(), peerAddressResolver));
+ newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+ shardSnapshots.get(shardName)), peerAddressResolver));
}
mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
private short leaderVersion;
private DatastoreContext datastoreContext;
- private final Shard.AbstractBuilder<?, ?> builder;
+ private Shard.AbstractBuilder<?, ?> builder;
private final ShardPeerAddressResolver addressResolver;
private ShardInformation(String shardName, ShardIdentifier shardId,
}
Props newProps(SchemaContext schemaContext) {
- return builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+ Preconditions.checkNotNull(builder);
+ Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
schemaContext(schemaContext).props();
+ builder = null;
+ return props;
}
String getShardName() {
}
}
- private static class ShardManagerCreator implements Creator<ShardManager> {
- private static final long serialVersionUID = 1L;
-
- final ClusterWrapper cluster;
- final Configuration configuration;
- final DatastoreContextFactory datastoreContextFactory;
- private final CountDownLatch waitTillReadyCountdownLatch;
- private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
- ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
- DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
- PrimaryShardInfoFutureCache primaryShardInfoCache) {
- this.cluster = cluster;
- this.configuration = configuration;
- this.datastoreContextFactory = datastoreContextFactory;
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = primaryShardInfoCache;
- }
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
- primaryShardInfoCache);
- }
- }
-
private static class OnShardInitialized {
private final Runnable replyRunnable;
private Cancellable timeoutSchedule;
return modules;
}
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private ClusterWrapper cluster;
+ private Configuration configuration;
+ private DatastoreContextFactory datastoreContextFactory;
+ private CountDownLatch waitTillReadyCountdownLatch;
+ private PrimaryShardInfoFutureCache primaryShardInfoCache;
+ private DatastoreSnapshot restoreFromSnapshot;
+ private volatile boolean sealed;
+
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+ }
+
+ public Builder cluster(ClusterWrapper cluster) {
+ checkSealed();
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder configuration(Configuration configuration) {
+ checkSealed();
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+ checkSealed();
+ this.datastoreContextFactory = datastoreContextFactory;
+ return this;
+ }
+
+ public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+ checkSealed();
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ return this;
+ }
+
+ public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ checkSealed();
+ this.primaryShardInfoCache = primaryShardInfoCache;
+ return this;
+ }
+
+ public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return this;
+ }
+
+ public Props props() {
+ sealed = true;
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+ Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+ Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+ return Props.create(ShardManager.class, this);
+ }
+ }
}
private final Set<URI> validNamespaces;
private PruningDataTreeModification transaction;
private int size;
+ private final byte[] restoreFromSnapshot;
- ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
+ ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+ String shardName, Logger log) {
this.store = Preconditions.checkNotNull(store);
+ this.restoreFromSnapshot = restoreFromSnapshot;
this.shardName = shardName;
this.log = log;
this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
@Override
public byte[] getRestoreFromSnapshot() {
- // TODO Auto-generated method stub
- return null;
+ return restoreFromSnapshot;
}
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.FileOutputStream;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
@Override
public void onSuccess(List<DatastoreSnapshot> snapshots) {
- saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+ saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
}
@Override
return returnFuture;
}
- private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+ private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
SettableFuture<RpcResult<Void>> returnFuture) {
try(FileOutputStream fos = new FileOutputStream(fileName)) {
SerializationUtils.serialize(snapshots, fos);
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stores a list of DatastoreSnapshot instances.
+ */
+public class DatastoreSnapshotList extends ArrayList<DatastoreSnapshot> {
+ private static final long serialVersionUID = 1L;
+
+ public DatastoreSnapshotList() {
+ }
+
+ public DatastoreSnapshotList(List<DatastoreSnapshot> snapshots) {
+ super(snapshots);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.osgi;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator for the bundle.
+ *
+ * @author Thomas Pantelis
+ */
+public class Activator implements BundleActivator {
+ private static final String RESTORE_DIRECTORY_PATH = "./clustered-datastore-restore";
+
+ @Override
+ public void start(BundleContext context) {
+ DatastoreSnapshotRestore.createInstance(RESTORE_DIRECTORY_PATH);
+ }
+
+ @Override
+ public void stop(BundleContext context) {
+ DatastoreSnapshotRestore.removeInstance();
+ }
+}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
import org.osgi.framework.BundleContext;
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
- datastoreContext, getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
+ datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+ getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
import org.osgi.framework.BundleContext;
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
- datastoreContext, getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
+ datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+ getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+
+/**
+ * Unit tests for DatastoreSnapshotRestore.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestoreTest {
+ String restoreDirectoryPath = "target/DatastoreSnapshotRestoreTest-" + System.nanoTime();
+ File restoreDirectoryFile = new File(restoreDirectoryPath);
+ File backupFile = new File(restoreDirectoryFile, "backup");
+
+ @After
+ public void tearDown() {
+ backupFile.delete();
+ restoreDirectoryFile.delete();
+ }
+
+ @Test
+ public void test() throws Exception {
+ assertTrue("Failed to mkdir " + restoreDirectoryPath, restoreDirectoryFile.mkdirs());
+
+ List<ShardSnapshot> shardSnapshots = new ArrayList<>();
+ shardSnapshots.add(new ShardSnapshot("cars", new byte[]{1,2}));
+ shardSnapshots.add(new ShardSnapshot("people", new byte[]{3,4}));
+ DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config", null, shardSnapshots );
+
+ shardSnapshots = new ArrayList<>();
+ shardSnapshots.add(new ShardSnapshot("cars", new byte[]{5,6}));
+ shardSnapshots.add(new ShardSnapshot("people", new byte[]{7,8}));
+ shardSnapshots.add(new ShardSnapshot("bikes", new byte[]{9,0}));
+ DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper", null, shardSnapshots );
+
+ DatastoreSnapshotList snapshotList = new DatastoreSnapshotList();
+ snapshotList.add(configSnapshot);
+ snapshotList.add(operSnapshot);
+
+ File backupFile = new File(restoreDirectoryFile, "backup");
+ try(FileOutputStream fos = new FileOutputStream(backupFile)) {
+ SerializationUtils.serialize(snapshotList, fos);
+ }
+
+ DatastoreSnapshotRestore.createInstance(restoreDirectoryPath);
+
+ verifySnapshot(configSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("config"));
+ verifySnapshot(operSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+
+ assertNull("DatastoreSnapshot was not removed", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+
+ assertFalse(backupFile + " was not deleted", backupFile.exists());
+
+ DatastoreSnapshotRestore.removeInstance();
+ DatastoreSnapshotRestore.createInstance("target/does-not-exist");
+ assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+ assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+ }
+
+ private void verifySnapshot(DatastoreSnapshot expected, DatastoreSnapshot actual) {
+ assertNotNull("DatastoreSnapshot is null", actual);
+ assertEquals("getType", expected.getType(), actual.getType());
+ assertTrue("ShardManager snapshots don't match", Objects.deepEquals(expected.getShardManagerSnapshot(),
+ actual.getShardManagerSnapshot()));
+ assertEquals("ShardSnapshots size", expected.getShardSnapshots().size(), actual.getShardSnapshots().size());
+ for(int i = 0; i < expected.getShardSnapshots().size(); i++) {
+ assertEquals("ShardSnapshot " + (i + 1) + " name", expected.getShardSnapshots().get(i).getName(),
+ actual.getShardSnapshots().get(i).getName());
+ assertArrayEquals("ShardSnapshot " + (i + 1) + " snapshot", expected.getShardSnapshots().get(i).getSnapshot(),
+ actual.getShardSnapshots().get(i).getSnapshot());
+ }
+ }
+}
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
cleanup(dataStore);
}};
}
+
+ @Test
+ public void testRestoreFromDatastoreSnapshot() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ String name = "transactionIntegrationTest";
+
+ ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
+ CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
+ CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+
+ ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full());
+ AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
+ NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
+ YangInstanceIdentifier.builder().build());
+
+ Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+ NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+ dataTree = new ShardDataTree(SchemaContextHelper.full());
+ AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
+ root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
+
+ Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+ restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
+ new DatastoreSnapshot.ShardSnapshot("cars",
+ org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
+ new DatastoreSnapshot.ShardSnapshot("people",
+ org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
+
+ DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+ true, "cars", "people");
+
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", carsNode, optional.get());
+
+ optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", peopleNode, optional.get());
+
+ cleanup(dataStore);
+ }};
+ }
}
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
public class IntegrationTestKit extends ShardTestKit {
DatastoreContext.Builder datastoreContextBuilder;
+ DatastoreSnapshot restoreFromSnapshot;
public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
super(actorSystem);
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory);
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
+ restoreFromSnapshot);
dataStore.onGlobalContextUpdated(schemaContext);
}
private Props newShardMgrProps(Configuration config) {
- return ShardManager.props(new MockClusterWrapper(), config,
- newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache);
+ return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config).
+ datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+ waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props();
}
private Props newPropsShardMgrWithMockShardActor() {
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory(
- datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache);
+ return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
+ datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+ waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
}
};
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) {
+ return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
+ datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
+ primaryShardInfoCache(primaryShardInfoCache)) {
@Override
protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
@Test
public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
- final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
- primaryShardInfoCache);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
ShardManager shardManagerActor = shardManager.underlyingActor();
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
RaftState.Follower.name(), RaftState.Leader.name()));
assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
@Test
public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
- final Props persistentProps = newShardMgrProps();
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
ShardManager shardManagerActor = shardManager.underlyingActor();
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
RaftState.Follower.name(), RaftState.Candidate.name()));
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
// Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
- shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(
+ true, shardId));
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
}
@Test
public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
- final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
- primaryShardInfoCache);
+ final Props persistentProps = newShardMgrProps();
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
+ String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
ShardManager shardManagerActor = shardManager.underlyingActor();
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
RaftState.Candidate.name(), RaftState.Follower.name()));
// Initially will be false
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
// Send status true will make sync status true
- shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
// Send status false will make sync status false
- shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
@Test
public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
- final Props persistentProps = ShardManager.props(new MockClusterWrapper(),
- new MockConfiguration() {
- @Override
- public List<String> getMemberShardNames(String memberName) {
- return Arrays.asList("default", "astronauts");
- }
- },
- newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
- primaryShardInfoCache);
+ final Props persistentProps = newShardMgrProps(new MockConfiguration() {
+ @Override
+ public List<String> getMemberShardNames(String memberName) {
+ return Arrays.asList("default", "astronauts");
+ }
+ });
+
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
// Make default shard leader
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+ String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// default = Leader, astronauts is unknown so sync status remains false
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
// Make astronauts shard leader as well
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+ String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// Now sync status should be true
assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
// Make astronauts a Follower
- shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+ shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Leader.name(), RaftState.Follower.name()));
// Sync status is not true
assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
// Make the astronauts follower sync status true
- shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+ shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
// Sync status is now true
assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
TestShardManager(String shardMrgIDSuffix) {
- super(new MockClusterWrapper(), new MockConfiguration(),
- newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()),
- ready, new PrimaryShardInfoFutureCache());
+ super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
+ datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())).
+ waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()));
}
@Override
private final ActorRef shardActor;
private final String name;
- protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name,
- ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
- super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache);
+ public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
+ super(builder);
this.shardActor = shardActor;
this.name = name;
}
@Test
public void testAppendRecoveredLogEntryDataTreeCandidatePayload(){
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
try {
coordinator.appendRecoveredLogEntry(DataTreeCandidatePayload.create(createCar()));
@Test
public void testAppendRecoveredLogEntryModificationPayload() throws IOException {
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
try {
final MutableCompositeModification modification = new MutableCompositeModification((short) 1);
@Test
public void testAppendRecoveredLogEntryCompositeModificationPayload() throws IOException {
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
try {
final MutableCompositeModification modification = new MutableCompositeModification((short) 1);
@Test
public void testAppendRecoveredLogEntryCompositeModificationByteStringPayload() throws IOException {
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
try {
final MutableCompositeModification modification = new MutableCompositeModification((short) 1);
@Test
public void testApplyRecoverySnapshot(){
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree , peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
coordinator.applyRecoverySnapshot(createSnapshot());
@Test
public void testApplyCurrentLogRecoveryBatch(){
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+ peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
coordinator.startLogRecoveryBatch(10);
try {
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
- dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory);
+ dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory, null);
dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
}
import java.math.BigInteger;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
public static NormalizedNode<?, ?> createEmptyCarsList(){
+ return newCarsNode(newCarsMapNode());
+ }
- // Create a list builder
- CollectionNodeBuilder<MapEntryNode, MapNode> cars =
- ImmutableMapNodeBuilder.create().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(
- CAR_QNAME));
+ public static ContainerNode newCarsNode(MapNode carsList) {
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(
+ BASE_QNAME)).withChild(carsList).build();
+ }
- return ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
- .withChild(cars.build())
- .build();
+ public static MapNode newCarsMapNode(MapEntryNode... carEntries) {
+ CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableMapNodeBuilder.create().
+ withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CAR_QNAME));
+ for(MapEntryNode e: carEntries) {
+ builder.withChild(e);
+ }
+ return builder.build();
}
public static NormalizedNode<?, ?> emptyContainer(){