import javax.annotation.Nullable;
/**
- * Proposal for how a key based semaphore provider should look like.
- * <ul>
- * <li>thread safe</li>
- * <li>garbage-collect unused semaphores</li>
- * <li>for the same key there must be always only one semaphore available</li>
- * </ul>
- *
- *
- * usage:
- * <pre>
- * final Semaphore guard = semaphoreKeeper.summonGuard(key);
- * guard.acquire();
- * // guard protected logic ...
- * guard.release();
- * </pre>
- *
+ * Key based semaphore provider.
+ * For the same key there is always only one semaphore available. Unused semaphores are garbage-collect.
* @param <K> key type
*/
public interface SemaphoreKeeper<K> {
/**
+ * Create or load semaphore for key from cache.
* @param key semaphore identifier
* @return new or existing semaphore for given key, for one key there is always only one semaphore available
*/
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.TableForwarder;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
- final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
- new SemaphoreKeeperGuavaImpl<>(1, true));
- final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool,
- new SemaphoreKeeperGuavaImpl<>(1, true));
+ final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
+ final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
try {
return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
- LOG.warn("Syncup future timeout occured {}", nodeId.getValue(), e);
+ LOG.warn("Syncup future timeout occured {}", nodeId.getValue());
return Boolean.FALSE;
}
});
import java.util.concurrent.Semaphore;
import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
- private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+ private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+ new SemaphoreKeeperGuavaImpl<>(1, true);
- public SyncReactorFutureZipDecorator(final SyncReactor delegate,
- final ListeningExecutorService executorService,
- final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+ public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
super(delegate, executorService);
- this.semaphoreKeeper = semaphoreKeeper;
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
private final SyncReactor delegate;
- private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+ private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+ new SemaphoreKeeperGuavaImpl<>(1, true);
- public SyncReactorGuardDecorator(final SyncReactor delegate,
- final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+ public SyncReactorGuardDecorator(final SyncReactor delegate) {
this.delegate = delegate;
- this.semaphoreKeeper = semaphoreKeeper;
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
}
private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
- final long stampBeforeGuard,
- final long stampAfterGuard,
- final NodeId nodeId) {
+ final long stampBeforeGuard,
+ final long stampAfterGuard,
+ final NodeId nodeId) {
return new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable final Boolean result) {
* @param gatherUpdates check content of pending item if present on device (and create update task eventually)
* @return list of safe synchronization steps
*/
- public static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
+ private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
final Map<FlowDescriptor, Flow> flowOperationalMap,
final boolean gatherUpdates) {
final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Key-based semaphore provider.
- */
public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImpl.class);
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@Before
public void setUp() {
- final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("frsync-test-%d")
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
.build());
syncThreadPool = MoreExecutors.listeningDecorator(executorService);
- reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper);
+ reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
.augmentation(FlowCapableNode.class);
}
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@Before
public void setUp() throws Exception {
- final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
- reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper);
+ reactor = new SyncReactorGuardDecorator(delegate);
InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);