* @see <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.10">ovsdb spec</a>
* <p/>
*
- * @see org.opendaylight.ovsdb.lib.OvsDBClient#lock(String, LockStolenCallback)
+ * @see org.opendaylight.ovsdb.lib.OvsDBClient#lock(String, LockAquisitionCallback, LockStolenCallback)
*/
public interface LockStolenCallback {
package org.opendaylight.ovsdb.lib;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+
public interface MonitorCallBack {
+
+ void update(TableUpdates result);
+
+ void exception(Throwable t);
}
package org.opendaylight.ovsdb.lib;
-public interface MonitorHandle {
+import java.io.Serializable;
+
+public class MonitorHandle implements Serializable{
+ String id;
+
+ public MonitorHandle(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
}
import org.opendaylight.ovsdb.lib.operations.OperationResult;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.TableSchema;
import java.util.List;
/**
* ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.5">monitor</a> operation.
- * @param monitorRequest represents what needs to be monitored including a client specified monitor handle. This
+ * @param monitorRequests represents what needs to be monitored including a client specified monitor handle. This
* handle is used to later cancel ({@link #cancelMonitor(MonitorHandle)}) the monitor.
* @param callback receives the monitor response
*/
- public void monitor(MonitorRequest monitorRequest, MonitorCallBack callback);
+ public <E extends TableSchema<E>> MonitorHandle monitor(DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
+ MonitorCallBack callback);
/**
* Cancels an existing monitor method.
package org.opendaylight.ovsdb.lib;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.ovsdb.lib.jsonrpc.Params;
import org.opendaylight.ovsdb.lib.message.MonitorRequest;
import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
import org.opendaylight.ovsdb.lib.message.TransactBuilder;
+import org.opendaylight.ovsdb.lib.message.UpdateNotification;
import org.opendaylight.ovsdb.lib.operations.Operation;
import org.opendaylight.ovsdb.lib.operations.OperationResult;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.TableSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
public class OvsDBClientImpl implements OvsDBClient {
- ExecutorService executorService;
- String schemaName;
- OvsdbRPC rpc;
- Map<String, DatabaseSchema> schema = Maps.newHashMap();
- Queue<Throwable> exceptions;
+ protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
+ private ExecutorService executorService;
+ private OvsdbRPC rpc;
+ private Map<String, DatabaseSchema> schema = Maps.newHashMap();
+ private HashMap<String, MonitorCallBack> monitorCallbacks = Maps.newHashMap();
+ private Queue<Throwable> exceptions;
+ private OvsdbRPC.Callback rpcCallback;
public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
this.rpc = rpc;
this.executorService = executorService;
}
- public OvsDBClientImpl() {
+ OvsDBClientImpl() {
+ }
+
+ void setupUpdateListner() {
+ if (rpcCallback == null) {
+ OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
+ @Override
+ public void update(Object node, UpdateNotification upadateNotification) {
+ Object key = upadateNotification.getContext();
+ MonitorCallBack monitorCallBack = monitorCallbacks.get(key);
+ if (monitorCallBack == null) {
+ //ignore ?
+ logger.info("callback received with context {}, but no known handler. Ignoring!", key);
+ return;
+ }
+ monitorCallBack.update(upadateNotification.getUpdate());
+ }
+
+ @Override
+ public void locked(Object node, List<String> ids) {
+
+ }
+
+ @Override
+ public void stolen(Object node, List<String> ids) {
+
+ }
+ };
+ this.rpcCallback = temp;
+ rpc.registerCallback(temp);
+ }
}
@Override
builder.addOperation(o);
}
- ListenableFuture<List<OperationResult>> transact = rpc.transact(builder);
- return transact;
+ return rpc.transact(builder);
}
@Override
- public void monitor(MonitorRequest monitorRequest, MonitorCallBack callback) {
- throw new UnsupportedOperationException("not yet implemented");
+ public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
+ List<MonitorRequest<E>> monitorRequest,
+ final MonitorCallBack callback) {
+
+ final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
+ new Function<MonitorRequest<E>, String>() {
+ @Override
+ public String apply(MonitorRequest<E> input) {
+ return input.getTableName();
+ }
+ });
+
+ final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
+ registerCallback(monitorHandle, callback);
+
+ ListenableFuture<TableUpdates> monitor = rpc.monitor(new Params() {
+ @Override
+ public List<Object> params() {
+ return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
+ }
+ });
+ Futures.addCallback(monitor, new FutureCallback<TableUpdates>() {
+ @Override
+ public void onSuccess(TableUpdates result) {
+ callback.update(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.exception(t);
+ }
+ });
+
+ return monitorHandle;
+ }
+
+ private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback) {
+ this.monitorCallbacks.put(monitorHandle.getId(), callback);
+ setupUpdateListner();
}
@Override
DatabaseSchema databaseSchema = schema.get(database);
- if (databaseSchema == null) {
- ListenableFuture<Map<String, DatabaseSchema>> schemaFromDevice = getSchemaFromDevice(Lists.newArrayList(database));
+ if (databaseSchema == null || cacheResult) {
+ return Futures.transform(
+ getSchemaFromDevice(Lists.newArrayList(database)),
+ new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
+ @Override
+ public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
+ if (result.containsKey(database)) {
+ DatabaseSchema s = result.get(database);
+ if (cacheResult) {
+ OvsDBClientImpl.this.schema.put(database, s);
+ }
+ return s;
+ } else {
+ return null;
+ }
+ }
+ }, executorService);
- final SettableFuture<DatabaseSchema> future = SettableFuture.create();
- Futures.addCallback(schemaFromDevice, new FutureCallback<Map<String, DatabaseSchema>>() {
- @Override
- public void onSuccess(Map<String, DatabaseSchema> result) {
- if (result.containsKey(database)) {
- DatabaseSchema s = result.get(database);
- if (cacheResult) {
- OvsDBClientImpl.this.schema.put(database, s);
- }
- future.set(s);
- } else {
- future.set(null);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- //todo: should wrap
- future.setException(t);
- }
- });
- return future;
} else {
return Futures.immediateFuture(databaseSchema);
}
private void _populateSchema(final List<String> dbNames,
- final Map<String, DatabaseSchema> schema,
- final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
+ final Map<String, DatabaseSchema> schema,
+ final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
if (dbNames == null || dbNames.isEmpty()) {
return;
Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
new com.google.common.base.Function<JsonNode, Void>() {
- @Override
- public Void apply(JsonNode jsonNode) {
- try{
- schema.put(dbNames.get(0), DatabaseSchema.fromJson(jsonNode));
- if (schema.size() > 1 && !sfuture.isCancelled()) {
- _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
- } else if (schema.size() == 1) {
- sfuture.set(schema);
- }
- } catch (Throwable e) {
- sfuture.setException(e);
- }
- return null;
- }});
+ @Override
+ public Void apply(JsonNode jsonNode) {
+ try {
+ schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
+ if (schema.size() > 1 && !sfuture.isCancelled()) {
+ _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
+ } else if (schema.size() == 1) {
+ sfuture.set(schema);
+ }
+ } catch (Throwable e) {
+ sfuture.setException(e);
+ }
+ return null;
+ }
+ });
}
public void setRpc(OvsdbRPC rpc) {
--- /dev/null
+/*
+ *
+ * * Copyright (C) 2014 EBay Software Foundation
+ * *
+ * * This program and the accompanying materials are made available under the
+ * * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * *
+ * * Authors : Ashwin Raveendran
+ *
+ */
+
+package org.opendaylight.ovsdb.lib;
+
+public class ParsingException extends RuntimeException {
+
+ public ParsingException() {
+ }
+
+ public ParsingException(String message) {
+ super(message);
+ }
+
+ public ParsingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParsingException(Throwable cause) {
+ super(cause);
+ }
+
+ public ParsingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
*/
package org.opendaylight.ovsdb.lib.jsonrpc;
+import com.google.common.reflect.Invokable;
import io.netty.channel.Channel;
import java.lang.reflect.InvocationHandler;
JsonNode params = requestJson.get("params");
Object param = objectMapper.convertValue(params, parameters[1]);
try {
- m.invoke(callback, context, param);
+ Invokable from = Invokable.from(m);
+ from.setAccessible(true);
+ from.invoke(callback, context, param);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("Unable to invoke callback " + m.getName(), e);
}
*/
package org.opendaylight.ovsdb.lib.message;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.opendaylight.ovsdb.lib.schema.TableSchema;
-import java.util.List;
-
-import org.opendaylight.ovsdb.lib.table.Column;
+import java.util.Set;
@JsonInclude(JsonInclude.Include.NON_NULL)
-public class MonitorRequest<E> {
-
- //@JsonSerialize(contentAs = ToStringSerializer.class)
- List<Column<E>> columns;
-
+public class MonitorRequest<E extends TableSchema<E>> {
+ @JsonIgnore String tableName;
+ Set<String> columns = Sets.newHashSet();
MonitorSelect select;
- public List<? extends Column> getColumns() {
- return columns;
+ public MonitorRequest() {
}
- public void setColumns(List<Column<E>> columns) {
+ public MonitorRequest(String tableName, Set<String> columns) {
+ this.tableName = tableName;
this.columns = columns;
}
+ public MonitorRequest(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
public MonitorSelect getSelect() {
return select;
this.select = select;
}
- public MonitorRequest<E> column(Column<E> column) {
- if (null == columns) {
- columns = Lists.newArrayList();
- }
- columns.add(column);
- return this;
+ public Set<String> getColumns() {
+ return columns;
}
+
+ public void setColumns(Set<String> columns) {
+ this.columns = columns;
+ }
+
}
*/
package org.opendaylight.ovsdb.lib.message;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.opendaylight.ovsdb.lib.jsonrpc.Params;
-import org.opendaylight.ovsdb.lib.table.Table;
+import com.google.common.collect.Sets;
+import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
+import org.opendaylight.ovsdb.lib.schema.TableSchema;
import java.util.List;
-import java.util.Map;
-public class MonitorRequestBuilder implements Params {
+public class MonitorRequestBuilder<E extends TableSchema<E>> {
+
+ E tableSchema;
+ MonitorRequest<E> monitorRequest;
+
+ MonitorRequestBuilder(E tableSchema) {
+ this.tableSchema = tableSchema;
+ }
- Map<String, MonitorRequest> requests = Maps.newLinkedHashMap();
+ public static <T extends TableSchema<T>> MonitorRequestBuilder<T> builder(T tableSchema) {
+ return new MonitorRequestBuilder<>(tableSchema);
+ }
+
+ MonitorRequest<E> getMonitorRequest() {
+ if (monitorRequest == null) {
+ monitorRequest = new MonitorRequest<>();
+ monitorRequest.setColumns(Sets.<String>newHashSet());
+ }
+ return monitorRequest;
+ }
+
+ public MonitorRequestBuilder<E> addColumn(String column) {
+ getMonitorRequest().getColumns().add(column);
+ return this;
+ }
+
+ public MonitorRequestBuilder<E> addColumn(ColumnSchema<?, ?> column) {
+ this.addColumn(column.getName());
+ return this;
+ }
+
+ public MonitorRequestBuilder<E> addColumns(List<ColumnSchema<E, ?>> columns) {
+ for(ColumnSchema<E, ?> schema : columns) {
+ this.addColumn(schema);
+ }
+ return this;
+ }
- @Override
- public List<Object> params() {
- return Lists.newArrayList("Open_vSwitch", null, requests);
+ public MonitorRequestBuilder<E> with(MonitorSelect select) {
+ getMonitorRequest().setSelect(select);
+ return this;
}
- public <T extends Table> MonitorRequest<T> monitor(T table) {
- MonitorRequest<T> req = new MonitorRequest<T>();
- requests.put(table.getTableName().getName(), req);
- return req;
+ public MonitorRequest<E> build() {
+ MonitorRequest<E> monitorRequest = getMonitorRequest();
+ if (monitorRequest.getSelect() == null) {
+ monitorRequest.setSelect(new MonitorSelect());
+ }
+ monitorRequest.setTableName(tableSchema.getName());
+ return monitorRequest;
}
}
public class MonitorSelect {
- boolean inital;
+ boolean initial;
boolean insert;
boolean delete;
boolean modify;
- public boolean isInital() {
- return inital;
+ public MonitorSelect(boolean initial, boolean insert, boolean delete, boolean modify) {
+ this.initial = initial;
+ this.insert = insert;
+ this.delete = delete;
+ this.modify = modify;
+ }
+
+ public MonitorSelect() {
+ }
+
+ public boolean isInitial() {
+ return initial;
}
- public void setInital(boolean inital) {
- this.inital = inital;
+ public void setInitial(boolean initial) {
+ this.initial = initial;
}
public boolean isInsert() {
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.ovsdb.lib.jsonrpc.Params;
+
public interface OvsdbRPC {
public static final String REGISTER_CALLBACK_METHOD = "registerCallback";
public ListenableFuture<List<String>> echo();
- public ListenableFuture<TableUpdates> monitor(MonitorRequestBuilder request);
+ public ListenableFuture<TableUpdates> monitor(Params equest);
public ListenableFuture<List<String>> list_dbs();
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
import org.opendaylight.ovsdb.lib.table.Bridge;
import org.opendaylight.ovsdb.lib.table.Capability;
import org.opendaylight.ovsdb.lib.table.Interface;
public void setIPFIXUpdate(TableUpdate<IPFIX> IPFIXUpdate) {
put(IPFIX.NAME, IPFIXUpdate);
}
+
+ public TableUpdate getUpdate(GenericTableSchema table) {
+ //todo Horrible just for time being, before this whole thing is refactored.
+ for (Map.Entry<Table.Name, TableUpdate> s : this.map.entrySet()) {
+ if (table.getName().equals(s.getKey().getName())) {
+ return s.getValue();
+ }
+ }
+ return null;
+ }
}
+++ /dev/null
-/*
- *
- * * Copyright (C) 2014 EBay Software Foundation
- * *
- * * This program and the accompanying materials are made available under the
- * * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * * and is available at http://www.eclipse.org/legal/epl-v10.html
- * *
- * * Authors : Ashwin Raveendran
- *
- */
-
-package org.opendaylight.ovsdb.lib.schema;
-
-public class ATableSchema extends TableSchema<ATableSchema> {
-}
public void validate(Object value)throws RuntimeException {
//todo(type check and validate based on constraints set)
}
+
+ /**
+ * Verifies if this Column if of the specified type
+ * @param type the type to check for
+ */
+ public void validateType(Class<?> type) {
+
+ }
}
return columnType;
}
}
- //todo mode to speicfic typed exception
+ //todo move to speicfic typed exception
throw new RuntimeException(String.format("could not find the right column type %s",
JsonUtils.prettyString(json)));
}
/**
- * Creates a ColumnType from the JsonNode if the type knows how to,
- * returns null otherwise
+ * Creates a ColumnType from the JsonNode if the implementation knows how to, returns null otherwise
*
* @param json the JSONNode object that needs to converted
* @return a valid SubType or Null (if the JsonNode does not represent
package org.opendaylight.ovsdb.lib.schema;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.reflect.Invokable;
+import org.opendaylight.ovsdb.lib.ParsingException;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-
+/**
+ * Represents an ovsdb database schema, which is comprised of a set of tables.
+ */
public class DatabaseSchema {
public static Logger logger = LoggerFactory.getLogger(DatabaseSchema.class);
- public Map<String, TableSchema> tables;
+ private String name;
+ private Map<String, TableSchema> tables;
public DatabaseSchema(Map<String, TableSchema> tables) {
this.tables = tables;
}
+ public DatabaseSchema(String name, Map<String, TableSchema> tables) {
+ this.name = name;
+ this.tables = tables;
+ }
+
public Set<String> getTables() {
return this.tables.keySet();
}
return this.getTables().contains(table);
}
- public TableSchema getTable(String table) {
- return this.tables.get(table);
+ public TransactionBuilder beginTransaction() {
+ return new TransactionBuilder(this);
+ }
+
+ public <E extends TableSchema<E>> E table(String tableName, Class<E> clazz) {
+ TableSchema<E> table = tables.get(tableName);
+
+ if (clazz.isInstance(table)) {
+ return clazz.cast(table);
+ }
+
+ return createTableSchema(clazz, table);
+ }
+
+ protected <E extends TableSchema<E>> E createTableSchema(Class<E> clazz, TableSchema<E> table) {
+ Constructor<E> declaredConstructor = null;
+ try {
+ declaredConstructor = clazz.getDeclaredConstructor(TableSchema.class);
+ } catch (NoSuchMethodException e) {
+ String message = String.format("Class %s does not have public constructor that accepts TableSchema object",
+ clazz);
+ throw new IllegalArgumentException(message, e);
+ }
+ Invokable<E, E> invokable = Invokable.from(declaredConstructor);
+ try {
+ return invokable.invoke(null, table);
+ } catch (Exception e) {
+ String message = String.format("Not able to create instance of class %s using public constructor " +
+ "that accepts TableSchema object", clazz);
+ throw new IllegalArgumentException(message, e);
+ }
}
- public static DatabaseSchema fromJson(JsonNode json) {
+ //todo : this needs to move to a custom factory
+ public static DatabaseSchema fromJson(String dbName, JsonNode json) {
if (!json.isObject() || !json.has("tables")) {
- //todo specific types of exception
- throw new RuntimeException("bad databaseschema root, expected \"tables\" as child");
+ throw new ParsingException("bad DatabaseSchema root, expected \"tables\" as child but was not found");
}
Map<String, TableSchema> tables = new HashMap<>();
- //Iterator<Map.Entry<String,JsonNode>> fields = json.fields();
for (Iterator<Map.Entry<String, JsonNode>> iter = json.get("tables").fields(); iter.hasNext(); ) {
Map.Entry<String, JsonNode> table = iter.next();
logger.debug("Read schema for table[{}]:{}", table.getKey(), table.getValue());
- tables.put(table.getKey(), TableSchema.fromJson(table.getKey(), table.getValue()));
+ //todo : this needs to done by a factory
+ tables.put(table.getKey(), new GenericTableSchema().fromJson(table.getKey(), table.getValue()));
}
- return new DatabaseSchema(tables);
+ return new DatabaseSchema(dbName, tables);
}
- public TransactionBuilder beginTransaction() {
- return new TransactionBuilder(this);
+ public String getName() {
+ return name;
}
- public <E extends TableSchema<E>> TableSchema<E> table(String tableName) {
- //todo : error handling
- return tables.get(tableName);
- }
-
- public <E extends TableSchema<E>> E table(String tableName, Class<E> clazz) {
- TableSchema<E> table = table(tableName);
- return table.as(clazz);
+ public void setName(String name) {
+ this.name = name;
}
}
--- /dev/null
+/*
+ *
+ * * Copyright (C) 2014 EBay Software Foundation
+ * *
+ * * This program and the accompanying materials are made available under the
+ * * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * *
+ * * Authors : Ashwin Raveendran
+ *
+ */
+
+package org.opendaylight.ovsdb.lib.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class GenericTableSchema extends TableSchema<GenericTableSchema> {
+
+ public GenericTableSchema() {
+ }
+
+ public GenericTableSchema(String tableName) {
+ super(tableName);
+ }
+
+ public GenericTableSchema fromJson(String tableName, JsonNode json) {
+
+ if (!json.isObject() || !json.has("columns")) {
+ //todo specific types of exception
+ throw new RuntimeException("bad tableschema root, expected \"columns\" as child");
+ }
+
+ Map<String, ColumnSchema> columns = new HashMap<>();
+ for (Iterator<Map.Entry<String, JsonNode>> iter = json.get("columns").fields(); iter.hasNext(); ) {
+ Map.Entry<String, JsonNode> column = iter.next();
+ logger.debug("%s:%s", tableName, column.getKey());
+ columns.put(column.getKey(), ColumnSchema.fromJson(column.getKey(), column.getValue()));
+ }
+
+ this.setName(tableName);
+ this.setColumns(columns);
+ return this;
+ }
+}
*/
package org.opendaylight.ovsdb.lib.schema;
-import com.fasterxml.jackson.databind.JsonNode;
import org.opendaylight.ovsdb.lib.operations.Insert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-public class TableSchema<E extends TableSchema<E>> {
+public abstract class TableSchema<E extends TableSchema<E>> {
+
protected static final Logger logger = LoggerFactory.getLogger(TableSchema.class);
private String name;
public TableSchema() {
}
+ protected TableSchema(String name) {
+ this.name = name;
+ }
+
public TableSchema(String name, Map<String, ColumnSchema> columns) {
this.name = name;
this.columns = columns;
return this.getColumns().contains(column);
}
- public ColumnSchema getColumn(String column) {
- return this.columns.get(column);
- }
public ColumnType getColumnType(String column) {
return this.columns.get(column).getType();
}
- public static TableSchema fromJson(String tableName, JsonNode json) {
-
- if (!json.isObject() || !json.has("columns")) {
- //todo specific types of exception
- throw new RuntimeException("bad tableschema root, expected \"columns\" as child");
- }
-
- Map<String, ColumnSchema> columns = new HashMap<>();
- for (Iterator<Map.Entry<String, JsonNode>> iter = json.get("columns").fields(); iter.hasNext(); ) {
- Map.Entry<String, JsonNode> column = iter.next();
- logger.debug("%s:%s", tableName, column.getKey());
- columns.put(column.getKey(), ColumnSchema.fromJson(column.getKey(), column.getValue()));
- }
-
- TableSchema tableSchema = new TableSchema(tableName, columns);
- return tableSchema;
- }
-
public <E extends TableSchema<E>> E as(Class<E> clazz) {
try {
Constructor<E> e = clazz.getConstructor(TableSchema.class);
public <D> ColumnSchema<E, D> column(String column, Class<D> type) {
//todo exception handling
- return columns.get(column);
+
+ ColumnSchema columnSchema = columns.get(column);
+ columnSchema.validateType(type);
+ return columnSchema;
+ }
+
+ public ColumnSchema column(String column) {
+ return this.columns.get(column);
}
public String getName() {
return name;
}
- public void setName(String name) {
+ protected void setName(String name) {
this.name = name;
}
+ protected void setColumns(Map<String, ColumnSchema> columns) {
+ this.columns = columns;
+ }
}
*/
package org.opendaylight.ovsdb.lib;
-import static org.opendaylight.ovsdb.lib.operations.Operations.op;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
import junit.framework.Assert;
-
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.ovsdb.lib.message.MonitorRequest;
+import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
+import org.opendaylight.ovsdb.lib.message.MonitorSelect;
import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
+import org.opendaylight.ovsdb.lib.message.TableUpdate;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
import org.opendaylight.ovsdb.lib.message.UpdateNotification;
import org.opendaylight.ovsdb.lib.operations.OperationResult;
-import org.opendaylight.ovsdb.lib.schema.ATableSchema;
import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
import org.opendaylight.ovsdb.lib.schema.TableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.opendaylight.ovsdb.lib.operations.Operations.op;
public class OvsDBClientTestIT extends OvsdbTestBase {
OvsDBClientImpl ovs;
-
-
@Test
public void testTransact() throws IOException, InterruptedException, ExecutionException {
ListenableFuture<DatabaseSchema> schema = ovs.getSchema(OvsDBClient.OPEN_VSWITCH_SCHEMA, true);
- TableSchema<ATableSchema> bridge = schema.get().table("Bridge");
+ TableSchema<GenericTableSchema> bridge = schema.get().table("Bridge", GenericTableSchema.class);
for (Map.Entry<String, ColumnSchema> names : bridge.getColumnSchemas().entrySet()) {
System.out.println("names = " + names.getKey());
System.out.println("names.getValue().getType() = " + names.getValue().getType().getBaseType());
}
- ColumnSchema<ATableSchema, String> name = bridge.column("name", String.class);
- ColumnSchema<ATableSchema, String> fail_mode = bridge.column("fail_mode", String.class);
+ ColumnSchema<GenericTableSchema, String> name = bridge.column("name", String.class);
+ ColumnSchema<GenericTableSchema, String> fail_mode = bridge.column("fail_mode", String.class);
ListenableFuture<List<OperationResult>> results = ovs.transactBuilder()
.add(op.insert(bridge).value(name, "br-int"))
System.out.println("operationResults = " + operationResults);
}
+ @Test
+ public void testMonitorRequest() throws ExecutionException, InterruptedException {
+
+ DatabaseSchema dbSchema = ovs.getSchema(OvsDBClient.OPEN_VSWITCH_SCHEMA, true).get();
+ GenericTableSchema bridge = dbSchema.table("Bridge", GenericTableSchema.class);
+
+ List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
+ monitorRequests.add(
+ MonitorRequestBuilder.builder(bridge)
+ .addColumn(bridge.column("name"))
+ .addColumn(bridge.column("fail_mode", String.class))
+ .with(new MonitorSelect(true, true, true, true))
+ .build());
+
+ final List<Object> results = Lists.newArrayList();
+
+ MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
+ @Override
+ public void update(TableUpdates result) {
+ results.add(result);
+ System.out.println("result = " + result);
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ results.add(t);
+ System.out.println("t = " + t);
+ }
+ });
+
+ //for (int i = 0; i < 5 && results.isEmpty(); i++) { //wait 5 seconds to get a result
+ for (int i = 0; i < 500 ; i++) { //wait 5 seconds to get a result
+ System.out.println("waiting");
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(!results.isEmpty());
+ Object result = results.get(0);
+ Assert.assertTrue(result instanceof TableUpdates);
+ TableUpdate bridgeUpdate = ((TableUpdates) result).getUpdate(bridge);
+ Assert.assertNotNull(bridgeUpdate);
+ }
+
@Test
public void testGetDBs() throws ExecutionException, InterruptedException {
ListenableFuture<List<String>> databases = ovs.getDatabases();
// TODO Auto-generated method stub
}
+
@Override
public void locked(Object node, List<String> ids) {
// TODO Auto-generated method stub
JsonNode jsonNode = mapper.readTree(resourceAsStream);
System.out.println("jsonNode = " + jsonNode.get("id"));
- DatabaseSchema schema = DatabaseSchema.fromJson(jsonNode.get("result"));
+ DatabaseSchema schema = DatabaseSchema.fromJson("some", jsonNode.get("result"));
assertNotNull(schema);
}
-ovsdbserver.ipaddress=192.168.56.104
-ovsdbserver.port=6644
\ No newline at end of file
+ovsdbserver.ipaddress=192.168.111.135
+ovsdbserver.port=5000
\ No newline at end of file
--- /dev/null
+/*
+ * [[ Authors will Fill in the Copyright header ]]
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Authors : Brent Salisbury, Madhu Venugopal, Aswin Raveendran
+ */
+package org.opendaylight.ovsdb.lib.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.collections.MapUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.operations.InsertOperation;
+import org.opendaylight.ovsdb.lib.operations.MutateOperation;
+import org.opendaylight.ovsdb.lib.operations.Operation;
+import org.opendaylight.ovsdb.lib.operations.OperationResult;
+import org.opendaylight.ovsdb.lib.notation.Condition;
+import org.opendaylight.ovsdb.lib.notation.Function;
+import org.opendaylight.ovsdb.lib.notation.Mutation;
+import org.opendaylight.ovsdb.lib.notation.Mutator;
+import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
+import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.table.Bridge;
+import org.opendaylight.ovsdb.lib.table.Interface;
+import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
+import org.opendaylight.ovsdb.lib.table.Port;
+import org.opendaylight.ovsdb.lib.table.internal.Table;
+import org.opendaylight.ovsdb.lib.table.internal.Tables;
+import org.opendaylight.ovsdb.plugin.Connection;
+import org.opendaylight.ovsdb.plugin.ConnectionService;
+import org.opendaylight.ovsdb.plugin.InventoryService;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class OVSDBNettyFactoryIT {
+ InventoryService inventoryService;
+ private static String bridgeIdentifier = "br1";
+ private Properties props;
+
+ @Before
+ public void initialize() throws IOException {
+ InputStream is = this.getClass().getClassLoader()
+ .getResourceAsStream(
+ "org/opendaylight/ovsdb/lib/message/integration-test.properties");
+ if (is == null) {
+ throw new IOException("Unable to load integration-test.properties");
+ }
+ props = new Properties();
+ props.load(is);
+
+ }
+
+ @Test
+ public void testSome() throws InterruptedException, ExecutionException,
+ IOException {
+ ConnectionService connectionService = new ConnectionService();
+ connectionService.init();
+ inventoryService = new InventoryService();
+ inventoryService.init();
+ connectionService.setInventoryServiceInternal(inventoryService);
+ Node.NodeIDType.registerIDType("OVS", String.class);
+ Map<ConnectionConstants, String> params = new HashMap<ConnectionConstants, String>();
+ params.put(ConnectionConstants.ADDRESS,
+ props.getProperty("ovsdbserver.ipaddress"));
+ params.put(ConnectionConstants.PORT,
+ props.getProperty("ovsdbserver.port", "6640"));
+ Node node = connectionService.connect("TEST", params);
+ if (node == null) {
+ throw new IOException("Unable to connect to the host");
+ }
+
+ Connection connection = connectionService.getConnection(node);
+ if (connection == null) {
+ throw new IOException("Unable to connect to the host");
+ }
+
+ OvsdbRPC ovsdb = connection.getRpc();
+ if (ovsdb == null) {
+ throw new IOException("Unable to obtain RPC instance");
+ }
+
+ //GET DB-SCHEMA
+ List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
+ ListenableFuture<DatabaseSchema> dbSchemaF = null; //ovsdb.get_schema(dbNames);
+ DatabaseSchema databaseSchema = dbSchemaF.get();
+ MapUtils.debugPrint(System.out, null, databaseSchema.getTables());
+
+ // TEST MONITOR
+ // YES it is expected to fail with "duplicate monitor ID" as we have a perpetual monitor in Inventory Service
+ MonitorRequestBuilder monitorReq = null; //(ashwin) new MonitorRequestBuilder();
+ for (Table<?> table : Tables.getTables()) {
+ //(ashwin) monitorReq.monitor(table);
+ }
+
+ ListenableFuture<TableUpdates> monResponse = null; //(ashwin) ovsdb.monitor(monitorReq);
+ System.out.println("Monitor Request sent :");
+ TableUpdates updates = monResponse.get();
+ inventoryService.processTableUpdates(node, updates);
+ inventoryService.printCache(node);
+
+ // TRANSACT INSERT TEST
+
+ Map<String, Table<?>> ovsTable = inventoryService.getTableCache(node, Open_vSwitch.NAME.getName());
+ String newBridge = "new_bridge";
+ String newInterface = "new_interface";
+ String newPort = "new_port";
+ String newSwitch = "new_switch";
+
+ Operation addSwitchRequest = null;
+
+ if(ovsTable != null){
+ String ovsTableUUID = (String) ovsTable.keySet().toArray()[0];
+ UUID bridgeUuidPair = new UUID(newBridge);
+ Mutation bm = new Mutation("bridges", Mutator.INSERT, bridgeUuidPair);
+ List<Mutation> mutations = new ArrayList<Mutation>();
+ mutations.add(bm);
+
+ UUID uuid = new UUID(ovsTableUUID);
+ Condition condition = new Condition("_uuid", Function.EQUALS, uuid);
+ List<Condition> where = new ArrayList<Condition>();
+ where.add(condition);
+ addSwitchRequest = new MutateOperation(Open_vSwitch.NAME.getName(), where, mutations);
+ }
+ else{
+ Open_vSwitch ovsTableRow = new Open_vSwitch();
+ OvsDBSet<UUID> bridges = new OvsDBSet<UUID>();
+ UUID bridgeUuidPair = new UUID(newBridge);
+ bridges.add(bridgeUuidPair);
+ ovsTableRow.setBridges(bridges);
+ addSwitchRequest = new InsertOperation(Open_vSwitch.NAME.getName(), newSwitch, ovsTableRow);
+ }
+
+ Bridge bridgeRow = new Bridge();
+ bridgeRow.setName(bridgeIdentifier);
+ OvsDBSet<UUID> ports = new OvsDBSet<UUID>();
+ UUID port = new UUID(newPort);
+ ports.add(port);
+ bridgeRow.setPorts(ports);
+ InsertOperation addBridgeRequest = new InsertOperation(Bridge.NAME.getName(), newBridge, bridgeRow);
+
+ Port portRow = new Port();
+ portRow.setName(bridgeIdentifier);
+ OvsDBSet<UUID> interfaces = new OvsDBSet<UUID>();
+ UUID interfaceid = new UUID(newInterface);
+ interfaces.add(interfaceid);
+ portRow.setInterfaces(interfaces);
+ InsertOperation addPortRequest = new InsertOperation(Port.NAME.getName(), newPort, portRow);
+
+ Interface interfaceRow = new Interface();
+ interfaceRow.setName(bridgeIdentifier);
+ interfaceRow.setType("internal");
+ InsertOperation addIntfRequest = new InsertOperation(Interface.NAME.getName(), newInterface, interfaceRow);
+
+ TransactBuilder transaction = new TransactBuilder();
+ transaction.addOperations(new ArrayList<Operation>(
+ Arrays.asList(addSwitchRequest, addIntfRequest, addPortRequest, addBridgeRequest)));
+
+ ListenableFuture<List<OperationResult>> transResponse = ovsdb.transact(transaction);
+ System.out.println("Transcation sent :");
+ List<OperationResult> tr = transResponse.get();
+ System.out.println("Transaction response : "+transResponse.toString());
+ List<Operation> requests = transaction.getRequests();
+ for (int i = 0; i < tr.size() ; i++) {
+ if (i < requests.size()) requests.get(i).setResult(tr.get(i));
+ }
+
+ System.out.println("Request + Response : "+requests.toString());
+ if (tr.size() > requests.size()) {
+ System.out.println("ERROR : "+tr.get(tr.size()-1).getError());
+ System.out.println("Details : "+tr.get(tr.size()-1).getDetails());
+ }
+
+ // TEST ECHO
+
+ ListenableFuture<List<String>> some = ovsdb.echo();
+ Object s = some.get();
+ System.out.printf("Result of echo is %s \n", s);
+
+ // TEST ECHO REQUEST/REPLY
+ Thread.sleep(10000);
+
+ connectionService.disconnect(node);
+ }
+}
DatabaseSchema databaseSchema = dbSchemaF.get();
inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
- MonitorRequestBuilder monitorReq = new MonitorRequestBuilder();
+ MonitorRequestBuilder monitorReq = null; //ashwin(not sure if we need) : new MonitorRequestBuilder();
for (Table<?> table : Tables.getTables()) {
if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
- monitorReq.monitor(table);
+ //ashwin(not sure if we need) monitorReq.monitor(table);
} else {
logger.debug("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
}
}
- ListenableFuture<TableUpdates> monResponse = connection.getRpc().monitor(monitorReq);
+ ListenableFuture<TableUpdates> monResponse = null; //ashwin(not sure if we need)connection.getRpc().monitor(monitorReq);
TableUpdates updates = monResponse.get();
if (updates.getError() != null) {
logger.error("Error configuring monitor, error : {}, details : {}",