2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.utils;
11 import akka.dispatch.Futures;
12 import akka.japi.Option;
13 import akka.persistence.SelectedSnapshot;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotSelectionCriteria;
16 import akka.persistence.snapshot.japi.SnapshotStore;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
32 * An akka SnapshotStore implementation that stores data in memory. This is intended for testing.
34 * @author Thomas Pantelis
36 public class InMemorySnapshotStore extends SnapshotStore {
38 static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class);
40 private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
41 private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
43 public static void addSnapshot(String persistentId, Object snapshot) {
44 List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
46 if(snapshotList == null) {
47 snapshotList = new ArrayList<>();
48 snapshots.put(persistentId, snapshotList);
51 synchronized (snapshotList) {
52 snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
53 System.currentTimeMillis()), snapshot));
57 @SuppressWarnings("unchecked")
58 public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
59 List<StoredSnapshot> stored = snapshots.get(persistentId);
61 return Collections.emptyList();
65 synchronized (stored) {
66 retList = Lists.newArrayListWithCapacity(stored.size());
67 for(StoredSnapshot s: stored) {
68 if(type.isInstance(s.data)) {
69 retList.add((T) s.data);
77 public static void clear() {
81 public static void addSnapshotSavedLatch(String persistenceId) {
82 snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
85 public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
86 if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
87 throw new AssertionError("Snapshot was not saved");
90 return getSnapshots(persistenceId, type).get(0);
94 public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
95 SnapshotSelectionCriteria snapshotSelectionCriteria) {
96 List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
97 if(snapshotList == null){
98 return Futures.successful(Option.<SelectedSnapshot>none());
101 synchronized(snapshotList) {
102 for(int i = snapshotList.size() - 1; i >= 0; i--) {
103 StoredSnapshot snapshot = snapshotList.get(i);
104 if(matches(snapshot, snapshotSelectionCriteria)) {
105 return Futures.successful(Option.some(new SelectedSnapshot(snapshot.metadata,
111 return Futures.successful(Option.<SelectedSnapshot>none());
114 private boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
115 return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() &&
116 snapshot.metadata.timestamp() <= criteria.maxTimestamp();
120 public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
121 List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
123 if(snapshotList == null){
124 snapshotList = new ArrayList<>();
125 snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
127 synchronized (snapshotList) {
128 snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
131 CountDownLatch latch = snapshotSavedLatches.get(snapshotMetadata.persistenceId());
136 return Futures.successful(null);
140 public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
144 public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
145 List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
147 if(snapshotList == null){
151 synchronized (snapshotList) {
152 for(int i=0;i<snapshotList.size(); i++){
153 StoredSnapshot snapshot = snapshotList.get(i);
154 if(snapshotMetadata.equals(snapshot.metadata)){
155 snapshotList.remove(i);
163 public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
165 LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistentId,
166 snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
168 List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
169 if(snapshotList == null){
173 synchronized (snapshotList) {
174 Iterator<StoredSnapshot> iter = snapshotList.iterator();
175 while(iter.hasNext()) {
176 StoredSnapshot s = iter.next();
177 if(matches(s, snapshotSelectionCriteria)) {
178 LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
179 s.metadata.sequenceNr(), s.metadata.timestamp());
187 private static class StoredSnapshot {
188 private final SnapshotMetadata metadata;
189 private final Object data;
191 private StoredSnapshot(SnapshotMetadata metadata, Object data) {
192 this.metadata = metadata;