/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.dao; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.DSAsyncDao; import org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.MasterDatabaseBindingListener; import org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.ReadableAsyncByKey; import org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.SimpleCachedDao; import org.opendaylight.groupbasedpolicy.sxp.ep.provider.impl.util.SxpListenerUtil; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.database.rev160308.MasterDatabaseFields; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.database.rev160308.Sgt; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.database.rev160308.master.database.fields.MasterDatabaseBinding; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.node.rev160308.SxpDatabasesFields; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.node.rev160308.SxpNodeIdentity; import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.node.rev160308.network.topology.topology.node.SxpDomains; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Purpose: general dao for EndPoint templates */ public class MasterDatabaseBindingDaoImpl implements DSAsyncDao, ReadableAsyncByKey { private static final Logger LOG = LoggerFactory.getLogger(MasterDatabaseBindingDaoImpl.class); private static final ListenableFuture> READ_FUTURE_ABSENT = Futures.immediateFuture(Optional.absent()); private final DataBroker dataBroker; private final SimpleCachedDao cachedDao; public MasterDatabaseBindingDaoImpl(final DataBroker dataBroker, final SimpleCachedDao cachedDao) { this.dataBroker = dataBroker; this.cachedDao = cachedDao; } @Override public ListenableFuture> read(@Nonnull final IpPrefix key) { final Optional cachedMasterDatabaseBinding = lookup(cachedDao, key); if (cachedMasterDatabaseBinding.isPresent()) { return Futures.immediateFuture(cachedMasterDatabaseBinding); } else if (!cachedDao.isEmpty()) { return READ_FUTURE_ABSENT; } else { final ListenableFuture cacheUpdatedFt = updateCache(); return Futures.transform(cacheUpdatedFt, new Function>() { @Nullable @Override public Optional apply(@Nullable final Void input) { return lookup(cachedDao, key); } }); } } private ListenableFuture updateCache() { final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction(); final CheckedFuture, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, buildReadPath(null)); Futures.addCallback(read, SxpListenerUtil.createTxCloseCallback(rTx)); return Futures.transform(read, new Function, Void>() { @Nullable @Override public Void apply(@Nullable final Optional input) { if (input.isPresent()) { // clean cache cachedDao.invalidateCache(); for (Node node : input.get().getNode()) { java.util.Optional.ofNullable(node.getAugmentation(SxpNodeIdentity.class)) .map(SxpNodeIdentity::getSxpDomains) .map(SxpDomains::getSxpDomain) .ifPresent((sxpDomain) -> { final List masterDBBindings = sxpDomain.stream() .map(SxpDatabasesFields::getMasterDatabase) .filter(masterDb -> masterDb != null) .map(MasterDatabaseFields::getMasterDatabaseBinding) .filter(binding -> binding != null) .flatMap(Collection::stream) .collect(Collectors.toList()); for (MasterDatabaseBinding masterDBItem : masterDBBindings) { // update all final MasterDatabaseBinding previousValue = cachedDao.update(masterDBItem.getIpPrefix(), masterDBItem); if (previousValue != null) { LOG.warn("updated key already obtained: [node:{}, sgt:{}]", node.getNodeId().getValue(), masterDBItem.getSecurityGroupTag()); } } }); } } else { LOG.warn("failed to update cache of SxpMasterDB - no data"); } return null; } }); } private InstanceIdentifier buildReadPath(final Sgt key) { return MasterDatabaseBindingListener.SXP_TOPOLOGY_PATH; } private Optional lookup(final SimpleCachedDao cachedDao, final IpPrefix key) { return cachedDao.find(key); } @Override public ListenableFuture> readBy(@Nonnull final Sgt specialKey) { final ListenableFuture cacheUpdatedFt; if (!cachedDao.isEmpty()) { cacheUpdatedFt = Futures.immediateFuture(null); } else { cacheUpdatedFt = updateCache(); } return Futures.transform(cacheUpdatedFt, new Function>() { @Nullable @Override public Collection apply(@Nullable final Void input) { final Collection foundGroups = new ArrayList<>(); for (MasterDatabaseBinding masterDBItem : cachedDao.values()) { if (masterDBItem.getSecurityGroupTag().equals(specialKey)) { foundGroups.add(masterDBItem); } } return foundGroups; } }); } }