return new Function<DataReader<P, D>, D>() {
@Override
public D apply(DataReader<P, D> input) {
- return input.readConfigurationData(path);
+ return input.readOperationalData(path);
}
};
}
package org.opendaylight.controller.sal.core.api.mount;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;
import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
-public interface MountProvisionInstance extends MountInstance, NotificationPublishService, RpcProvisionRegistry {
+public interface MountProvisionInstance extends //
+ MountInstance,//
+ NotificationPublishService, //
+ RpcProvisionRegistry,//
+ DataProviderService {
}
*/
package org.opendaylight.controller.sal.core.api.mount;
+import org.opendaylight.controller.sal.core.api.BrokerService;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-public interface MountService {
+public interface MountService extends BrokerService {
MountInstance getMountPoint(InstanceIdentifier path);
}
import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
import org.opendaylight.controller.sal.core.api.data.DataProviderService;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.controller.sal.core.api.mount.MountService;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
private ServiceRegistration<DataProviderService> dataProviderReg;
private SchemaServiceImpl schemaService;
private DataBrokerImpl dataService;
+ private MountPointManagerImpl mountService;
+ private ServiceRegistration<MountService> mountReg;
+ private ServiceRegistration<MountProvisionService> mountProviderReg;
@Override
public void start(BundleContext context) throws Exception {
dataReg = context.registerService(DataBrokerService.class, dataService, emptyProperties);
dataProviderReg = context.registerService(DataProviderService.class, dataService, emptyProperties);
+ mountService = new MountPointManagerImpl();
+ mountService.setDataBroker(dataService);
+ mountReg = context.registerService(MountService.class, mountService, emptyProperties);
+ mountProviderReg = context.registerService(MountProvisionService.class, mountService, emptyProperties);
}
@Override
package org.opendaylight.controller.sal.dom.broker;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.data.DataValidator;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
import org.opendaylight.controller.sal.dom.broker.impl.DataReaderRouter;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
public class MountPointImpl implements MountProvisionInstance {
- final RpcRouter rpcs;
- final DataReaderRouter dataReader;
- final NotificationRouter notificationRouter;
+ private final RpcRouter rpcs;
+ private final DataReaderRouter dataReader;
+ private final NotificationRouter notificationRouter;
+ private final DataReader<InstanceIdentifier,CompositeNode> readWrapper;
+
+
+ private final InstanceIdentifier mountPath;
public MountPointImpl(InstanceIdentifier path) {
+ this.mountPath = path;
rpcs = new RpcRouterImpl("");
dataReader = new DataReaderRouter();
notificationRouter = new NotificationRouterImpl();
+ readWrapper = new ReadWrapper();
+ }
+
+ public InstanceIdentifier getMountPath() {
+ return mountPath;
+ }
+
+ public DataReader<InstanceIdentifier, CompositeNode> getReadWrapper() {
+ return readWrapper;
}
@Override
@Override
public void sendNotification(CompositeNode notification) {
publish(notification);
+ }
+
+ @Override
+ public Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> registerCommitHandler(
+ InstanceIdentifier path, DataCommitHandler<InstanceIdentifier, CompositeNode> commitHandler) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void removeRefresher(DataStoreIdentifier store, DataRefresher refresher) {
+ // NOOP
+ }
+
+ @Override
+ public void addRefresher(DataStoreIdentifier store, DataRefresher refresher) {
+ // NOOP
+ }
+
+ @Override
+ public void addValidator(DataStoreIdentifier store, DataValidator validator) {
+ // NOOP
+ }
+ @Override
+ public void removeValidator(DataStoreIdentifier store, DataValidator validator) {
+ // NOOP
+ }
+
+ class ReadWrapper implements DataReader<InstanceIdentifier, CompositeNode> {
+
+
+ private InstanceIdentifier shortenPath(InstanceIdentifier path) {
+ InstanceIdentifier ret = null;
+ if(mountPath.contains(path)) {
+ List<PathArgument> newArgs = path.getPath().subList(mountPath.getPath().size(), path.getPath().size());
+ ret = new InstanceIdentifier(newArgs);
+ }
+ return ret;
+ }
+
+ @Override
+ public CompositeNode readConfigurationData(InstanceIdentifier path) {
+ InstanceIdentifier newPath = shortenPath(path);
+ if(newPath == null) {
+ return null;
+ }
+ return MountPointImpl.this.readConfigurationData(newPath);
+ }
+ @Override
+ public CompositeNode readOperationalData(InstanceIdentifier path) {
+ InstanceIdentifier newPath = shortenPath(path);
+ if(newPath == null) {
+ return null;
+ }
+ return MountPointImpl.this.readOperationalData(newPath);
+ }
}
}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.ConcurrentHashMap
import static com.google.common.base.Preconditions.*;
+import org.opendaylight.controller.sal.core.api.data.DataProviderService
class MountPointManagerImpl implements MountProvisionService {
+ @Property
+ DataProviderService dataBroker;
+
ConcurrentMap<InstanceIdentifier,MountPointImpl> mounts = new ConcurrentHashMap();
override createMountPoint(InstanceIdentifier path) {
checkState(!mounts.containsKey(path),"Mount already created");
val mount = new MountPointImpl(path);
+ registerMountPoint(mount);
mounts.put(path,mount);
+ return mount;
+ }
+
+ def registerMountPoint(MountPointImpl impl) {
+ dataBroker?.registerConfigurationReader(impl.mountPath,impl.readWrapper);
+ dataBroker?.registerOperationalReader(impl.mountPath,impl.readWrapper);
+
}
override getMountPoint(InstanceIdentifier path) {
mounts.get(path);
}
-
-
}
import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.controller.md.sal.common.api.data.DataReader
class DataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier, CompositeNode> {
--- /dev/null
+package org.opendaylight.controller.sal.dom.broker.osgi;
+
+import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.ServiceReference;
+
+public class MountProviderServiceProxy extends AbstractBrokerServiceProxy<MountProvisionService> implements MountProvisionService{
+
+
+ public MountProviderServiceProxy(ServiceReference<MountProvisionService> ref, MountProvisionService delegate) {
+ super(ref, delegate);
+ }
+
+ public MountProvisionInstance getMountPoint(InstanceIdentifier path) {
+ return getDelegate().getMountPoint(path);
+ }
+
+ public MountProvisionInstance createMountPoint(InstanceIdentifier path) {
+ return getDelegate().createMountPoint(path);
+ }
+
+ public MountProvisionInstance createOrGetMountPoint(InstanceIdentifier path) {
+ return getDelegate().createOrGetMountPoint(path);
+ }
+}
import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService
import org.opendaylight.controller.sal.core.api.notify.NotificationService
import org.opendaylight.controller.sal.core.api.model.SchemaService
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
class ProxyFactory {
new NotificationServiceProxy(ref as ServiceReference<NotificationService>, service);
}
+ private static def dispatch createProxyImpl(ServiceReference<?> ref, MountProvisionService service) {
+ new MountProviderServiceProxy(ref as ServiceReference<MountProvisionService>, service);
+ }
+
+
private static def dispatch createProxyImpl(ServiceReference<?> ref, SchemaService service) {
new SchemaServiceProxy(ref as ServiceReference<SchemaService>, service);
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.core.api.AbstractProvider;
import org.osgi.framework.BundleContext;
public class Activator extends AbstractProvider {
-
- ZeroMqRpcRouter router;
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- router = ZeroMqRpcRouter.getInstance();
- router.setBrokerSession(session);
- router.start();
- }
-
- @Override
- protected void stopImpl(BundleContext context) {
- router.stop();
- }
+
+ ZeroMqRpcRouter router;
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ router = ZeroMqRpcRouter.getInstance();
+ router.setBrokerSession(session);
+ router.start();
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ router.stop();
+ }
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import java.io.Serializable;
-/**
- * Created with IntelliJ IDEA.
- * User: abhishk2
- * Date: 10/25/13
- * Time: 12:32 PM
- * To change this template use File | Settings | File Templates.
- */
public class RpcRequestImpl implements RpcRouter.RpcRequest<QName, QName, InstanceIdentifier, Object>,Serializable {
private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import java.io.IOException;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
/**
private String pubIp = System.getProperty("pub.ip"); // other controller's ip
private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
+ private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
+
//Prevent instantiation
private ZeroMqRpcRouter() {
}
context = ZMQ.context(2);
publisher = context.socket(ZMQ.PUB);
int ret = publisher.bind("tcp://*:" + pubPort);
- System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
// serverPool = Executors.newSingleThreadExecutor();
serverPool = Executors.newCachedThreadPool();
handlersPool = Executors.newCachedThreadPool();
// Bind to publishing controller
subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://" + pubIp + ":" + subPort);
- System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
- + pubIp + ":" + subPort + "]");
+ String pubAddress = "tcp://" + pubIp + ":" + subPort;
+ subscriber.connect(pubAddress);
+ _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
//subscribe for announcements
//TODO: Message type would be changed. Update this
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(replySocket, ZMQ.Poller.POLLIN);
poller.register(subscriber, ZMQ.Poller.POLLIN);
- System.out.println(Thread.currentThread().getName() + "-Start Polling");
//TODO: Add code to restart the thread after exception
while (!Thread.currentThread().isInterrupted()) {
* @throws ClassNotFoundException
*/
private void handleAnnouncement() throws IOException, ClassNotFoundException {
- System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
+
+ _logger.info("Announcement received");
Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
- System.out.println("Topic:[" + topic + "]");
if (subscriber.hasReceiveMore()) {
try {
Message m = (Message) Message.deserialize(subscriber.recv());
- System.out.println(m);
+ _logger.debug("Announcement message [{}]", m);
+
// TODO: check on msg type or topic. Both
// should be same. Need to normalize.
if (Message.MessageType.ANNOUNCE == m.getType())
*/
private void handleRpcCall() throws InterruptedException, ExecutionException {
try {
- Message req = parseMessage(replySocket);
+ Message request = parseMessage(replySocket);
- System.out.println("Received RPC request [" + req + "]");
+ _logger.debug("Received rpc request [{}]", request);
// Call broker to process the message then reply
Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
- (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+ (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
RpcResult<CompositeNode> result = rpc.get();
Message response = new Message.MessageBuilder()
.type(MessageType.RESPONSE)
.sender(localIp + ":" + rpcPort)
- .route(req.getRoute())
+ .route(request.getRoute())
//.payload(result) TODO: enable and test
.build();
replySocket.send(Message.serialize(response));
- System.out.println("Sent RPC response [" + response + "]");
+ _logger.debug("Sent rpc response [{}]", response);
} catch (IOException ex) {
//TODO: handle exception and send error codes to caller
- System.out.println("Rpc request could not be handled" + ex);
+ ex.printStackTrace();
}
}
.payload(input.getPayload())
.build();
+ _logger.debug("Sending rpc request [{}]", requestMessage);
+
RpcReply<Object> reply = null;
try {
- System.out.println("\n\nRPC Request [" + requestMessage + "]");
requestSocket.send(Message.serialize(requestMessage));
- final Message resp = parseMessage(requestSocket);
+ final Message response = parseMessage(requestSocket);
- System.out.println("\n\nRPC Response [" + resp + "]");
+ _logger.debug("Received response [{}]", response);
reply = new RpcReply<Object>() {
@Override
public Object getPayload() {
- return resp.getPayload();
+ return response.getPayload();
}
};
} catch (IOException ex) {
// TODO: Pass exception back to the caller
- System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+ ex.printStackTrace();
}
return reply;
Runnable task = new Runnable() {
public void run() {
- System.out.println(
- Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
-
try {
- System.out.println(
- Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
-
publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
publisher.send(Message.serialize(notice));
-
+ _logger.debug("Announcement sent [{}]", notice);
} catch (IOException ex) {
- System.out.println("Error in publishing");
+ _logger.error("Error in sending announcement [{}]", notice);
ex.printStackTrace();
}
- System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
- + "]");
-
}
};
handlersPool.execute(task);
// TODO: do registration for instance based routing
QName rpcType = route.getType();
RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+ _logger.debug("Routing table updated");
}
/**
Message msg = null;
try {
byte[] bytes = socket.recv();
- System.out.println("Received bytes:[" + bytes.length + "]");
+ _logger.debug("Received bytes:[{}]", bytes.length);
msg = (Message) Message.deserialize(bytes);
} catch (Throwable t) {
- System.out.println("Caught exception");
t.printStackTrace();
}
return msg;
@Override
public void onRpcImplementationAdded(QName name) {
- System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
+ _logger.debug("Announcing registration for [{}]", name);
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(name);
JSONTokener jt = new JSONTokener(result);
JSONObject json = new JSONObject(jt);
JSONArray subnetConfigs = json.getJSONArray("subnetConfig");
- Assert.assertEquals(subnetConfigs.length(), 0);
+ Assert.assertEquals(subnetConfigs.length(), 1); // should only get the default subnet
// Test GET subnet1 expecting 404
result = getJsonResult(baseURL + "default/subnet/" + name1);
private boolean isDefaultContainer = true;
private static final int REPLACE_RETRY = 1;
+ /* Information about the default subnet. If there have been no configured subnets, i.e.,
+ * subnets.size() == 0 or subnetsConfigList.size() == 0, then this subnet will be the
+ * only subnet returned. As soon as a user-configured subnet is created this one will
+ * vanish.
+ */
+ protected static SubnetConfig DEFAULT_SUBNETCONFIG;
+ protected static Subnet DEFAULT_SUBNET;
+ protected static String DEFAULT_SUBNET_NAME = "default (cannot be modifed)";
+ static{
+ DEFAULT_SUBNETCONFIG = new SubnetConfig(DEFAULT_SUBNET_NAME, "0.0.0.0/0", new ArrayList<String>());
+ DEFAULT_SUBNET = new Subnet(DEFAULT_SUBNETCONFIG);
+ }
+
public void notifySubnetChange(Subnet sub, boolean add) {
synchronized (switchManagerAware) {
for (Object subAware : switchManagerAware) {
@Override
public List<SubnetConfig> getSubnetsConfigList() {
- return new ArrayList<SubnetConfig>(subnetsConfigList.values());
+ // if there are no subnets, return the default subnet
+ if(subnetsConfigList.size() == 0){
+ return Collections.singletonList(DEFAULT_SUBNETCONFIG);
+ }else{
+ return new ArrayList<SubnetConfig>(subnetsConfigList.values());
+ }
}
@Override
public SubnetConfig getSubnetConfig(String subnet) {
- return subnetsConfigList.get(subnet);
+ // if there are no subnets, return the default subnet
+ if(subnetsConfigList.size() == 0 && subnet == DEFAULT_SUBNET_NAME){
+ return DEFAULT_SUBNETCONFIG;
+ }else{
+ return subnetsConfigList.get(subnet);
+ }
}
private List<SpanConfig> getSpanConfigList(Node node) {
@Override
public Subnet getSubnetByNetworkAddress(InetAddress networkAddress) {
+ // if there are no subnets, return the default subnet
+ if (subnets.size() == 0) {
+ return DEFAULT_SUBNET;
+ }
+
Subnet sub;
Set<InetAddress> indices = subnets.keySet();
for (InetAddress i : indices) {