今天教大家藉助一款框架快速實現一個資料庫,這個框架就是Calcite
,下面會帶大家通過兩個例子快速教會大家怎麼實現,一個是可以通過 SQL 語句的方式可以直接查詢檔案內容,第二個是模擬 Mysql 查詢功能,以及最後告訴大家怎麼實現 SQL 查詢 Kafka 資料。
Calcite
是一個用於優化異構資料來源的查詢處理的可插拔基礎框架(他是一個框架),可以將任意資料(Any data, Anywhere)DML 轉換成基於 SQL 的 DML 引擎,並且我們可以選擇性的使用它的部分功能。
使用 SQL 存取記憶體中某個資料
使用 SQL 存取某個檔案的資料
跨資料來源的資料存取、聚合、排序等(例如 Mysql 和 Redis 資料來源中的資料進行join)
當我們需要自建一個資料庫的時候,資料可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方介面資料等等,我們只有資料,我們想讓這些資料支援 SQL 形式動態增刪改查。
另外,像Hive、Drill、Flink、Phoenix 和 Storm 等專案中,資料處理系統都是使用 Calcite 來做 SQL 解析和查詢優化,當然,還有部分用來構建自己的 JDBC driver。
就是將標準 SQL(可以理解為Mysql)關鍵詞以及關鍵詞之間的字串擷取出來,每一個token
,會被封裝為一個SqlNode
,SqlNode
會衍生很多子類,比如Select
會被封裝為SqlSelect
,當前 SqlNode
也能反解析為 SQL 文字。
某個欄位的名稱和型別資訊
多個 RelDataTypeField 組成了 RelDataType,可以理解為資料行
一個完整的表的資訊
所有後設資料的組合,可以理解為一組 Table 或者庫的概念
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!-- 目前最新版本 2022-09-10日更新-->
<version>1.32.0</version>
</dependency>
model.json 裡面主要描述或者說告訴 Calcite
如何建立 Schema
,也就是告訴框架怎麼建立出庫。
{
"version": "1.0",//忽略
"defaultSchema": "CSV",//設定預設的schema
"schemas": [//可定義多個schema
{
"name": "CSV",//相當於namespace和上面的defaultSchema的值對應
"type": "custom",//寫死
"factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實現的factory的包的全路徑
"operand": { //這裡可以傳遞自定義引數,最終會以map的形式傳遞給factory的operand引數
"directory": "csv"//directory代表calcite會在resources下面的csv目錄下面讀取所有的csv檔案,factory建立的Schema會吧這些檔案全部構建成Table,可以理解為讀取資料檔案的根目錄,當然key的名稱也不一定非得用directory,你可以隨意指定
}
}
]
}
接下來還需要定義一個 csv
檔案,用來定義表結構。
NAME:string,MONEY:string
aixiaoxian,10000萬
xiaobai,10000萬
adong,10000萬
maomao,10000萬
xixi,10000萬
zizi,10000萬
wuwu,10000萬
kuku,10000萬
整個專案的結構大概就是這樣:
在上述檔案中指定的包路徑下去編寫 CsvSchemaFactory
類,實現 SchemaFactory
介面,並且實現裡面唯一的方法 create
方法,建立Schema
(庫)。
public class CsvSchemaFactory implements SchemaFactory {
/**
* parentSchema 父節點,一般為root
* name 為model.json中定義的名字
* operand 為model.json中定於的資料,這裡可以傳遞自定義引數
*
* @param parentSchema Parent schema
* @param name Name of this schema
* @param operand The "operand" JSON property
* @return
*/
@Override
public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
final String directory = (String) operand.get("directory");
File directoryFile = new File(directory);
return new CsvSchema(directoryFile, "scannable");
}
}
有了 SchemaFactory
,接下來需要自定義 Schema
類。
自定義的 Schema
需要實現 Schema
介面,但是直接實現要實現的方法太多,我們去實現官方的 AbstractSchema
類,這樣就只需要實現一個方法就行(如果有其他客製化化需求可以實現原生介面)。
核心的邏輯就是createTableMap
方法,用於建立出 Table
表。
他會掃描指定的Resource
下面的所有 csv
檔案,將每個檔案對映成Table
物件,最終以map
形式返回,Schema
介面的其他幾個方法會用到這個物件。
//實現這一個方法就行了
@Override
protected Map<String, Table> getTableMap() {
if (tableMap == null) {
tableMap = createTableMap();
}
return tableMap;
}
private Map<String, Table> createTableMap() {
// Look for files in the directory ending in ".csv"
final Source baseSource = Sources.of(directoryFile);
//會自動過濾掉非指定檔案字尾的檔案,我這裡寫的csv
File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv");
});
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (File file : files) {
Source source = Sources.of(file);
final Source sourceSansCsv = source.trimOrNull(".csv");
if (sourceSansCsv != null) {
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}
Schema
有了,並且資料檔案 csv
也對映成 Table
了,一個 csv
檔案對應一個 Table
。
接下來我們去自定義 Table
,自定義 Table
的核心是我們要定義欄位的型別和名稱,以及如何讀取 csv
檔案。
csv
檔案頭中獲取(當前檔案頭需要我們自己定義,包括規則我們也可以客製化化)。/**
* Base class for table that reads CSV files.
*/
public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable List<RelDataType> fieldTypes;
/**
* Creates a CsvTable.
*/
CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this.source = source;
this.protoRowType = protoRowType;
}
/**
* 建立一個CsvTable,繼承AbstractTable,需要實現裡面的getRowType方法,此方法就是獲取當前的表結構。
Table的型別有很多種,比如還有檢視型別,AbstractTable類中幫我們預設實現了Table介面的一些方法,比如getJdbcTableType 方法,預設為Table型別,如果有其他客製化化需求可直接實現Table介面。
和AbstractSchema很像
*/
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType != null) {
return protoRowType.apply(typeFactory);
}
if (rowType == null) {
rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null);
}
return rowType;
}
/**
* Returns the field types of this CSV table.
*/
public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {
if (fieldTypes == null) {
fieldTypes = new ArrayList<>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable List<RelDataType> fieldTypes) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
try (CSVReader reader = openCsv(source)) {
String[] strings = reader.readNext();
if (strings == null) {
strings = new String[]{"EmptyFileHasNoColumns:boolean"};
}
for (String string : strings) {
final String name;
final RelDataType fieldType;
//就是簡單的讀取字串冒號前面是名稱,冒號後面是型別
final int colon = string.indexOf(':');
if (colon >= 0) {
name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {
int precision = Integer.parseInt(decimalMatcher.group(1));
int scale = Integer.parseInt(decimalMatcher.group(2));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {
switch (typeString) {
case "string":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
case "boolean":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break;
case "byte":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break;
case "char":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break;
case "short":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break;
case "int":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break;
case "long":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break;
case "float":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break;
case "double":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break;
case "date":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break;
case "timestamp":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break;
case "time":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break;
default:
LOGGER.warn(
"Found unknown type: {} in file: {} for column: {}. Will assume the type of "
+ "column is string.",
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
}
}
} else {
// 如果沒定義,預設都是String型別,欄位名稱也是string
name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null) {
fieldTypes.add(fieldType);
}
}
} catch (IOException e) {
// ignore
}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
Table
的表結構欄位名稱和型別都獲取到了以後,就剩最後一步了,獲取檔案中的資料。我們需要自定義一個類,實現 ScannableTable
介面,並且實現裡面唯一的方法 scan
方法,其實本質上就是讀檔案,然後把檔案的每一行的資料和上述獲取的 fileType
進行匹配。@Override
public Enumerable<Object[]> scan(DataContext root) {
JavaTypeFactory typeFactory = root.getTypeFactory();
final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
//返回我們自定義的讀取資料的類
return new CsvEnumerator<>(source, cancelFlag, false, null,
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {
this.reader = openCsv(source);
//跳過第一行,因為第一行是定義型別和名稱的
this.reader.readNext(); // skip header row
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//CsvEnumerator必須實現calcit自己的迭代器,裡面有current、moveNext方法,current是返回當前遊標所在的資料記錄,moveNext是將遊標指向下一個記錄,官網中自己定義了一個型別轉換器,是將csv檔案中的資料轉換成檔案頭指定的型別,這個需要我們自己來實現
@Override
public E current() {
return castNonNull(current);
}
@Override
public boolean moveNext() {
try {
outer:
for (; ; ) {
if (cancelFlag.get()) {
return false;
}
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
if (filterValues != null) {
for (int i = 0; i < strings.length; i++) {
String filterValue = filterValues.get(i);
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
}
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
if (fieldType == null || string == null) {
return string;
}
switch (fieldType.getSqlTypeName()) {
case BOOLEAN:
if (string.length() == 0) {
return null;
}
return Boolean.parseBoolean(string);
case TINYINT:
if (string.length() == 0) {
return null;
}
return Byte.parseByte(string);
case SMALLINT:
if (string.length() == 0) {
return null;
}
return Short.parseShort(string);
case INTEGER:
if (string.length() == 0) {
return null;
}
return Integer.parseInt(string);
case BIGINT:
if (string.length() == 0) {
return null;
}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0) {
return null;
}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0) {
return null;
}
return Double.parseDouble(string);
case DECIMAL:
if (string.length() == 0) {
return null;
}
return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
case DATE:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_DATE.parse(string);
return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
} catch (ParseException e) {
return null;
}
case TIME:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIME.parse(string);
return (int) date.getTime();
} catch (ParseException e) {
return null;
}
case TIMESTAMP:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIMESTAMP.parse(string);
return date.getTime();
} catch (ParseException e) {
return null;
}
case VARCHAR:
default:
return string;
}
}
至此我們需要準備的東西:庫、表名稱、欄位名稱、欄位型別都有了,接下來我們去寫我們的 SQL 語句查詢我們的資料檔案。
建立好幾個測試的資料檔案,例如上面專案結構中我建立 2 個 csv 檔案USERINFO.csv
、ASSET.csv
,然後建立測試類。
這樣跑起來,就可以通過 SQL 語句的方式直接查詢資料了。
public class Test {
public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {
Properties info = new Properties();
info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
print(statement.executeQuery("select * from asset "));
print(statement.executeQuery(" select * from userinfo "));
print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
print(statement.executeQuery(" select * from userinfo where age >60 "));
print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
} finally {
connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
}
查詢結果:
這裡在測試的時候踩到2個坑,大家如果自己實驗的時候可以避免下。
Calcite
預設會把你的 SQL 語句中的表名和類名全部轉換為大寫,因為預設的 csv(其他檔案也一樣)檔案的名稱就是表名,除非你自定義規則,所以你的檔名要寫成大寫。
Calcite
有一些預設的關鍵字不能用作表名,不然會查詢失敗,比如我剛開始定的user.csv
就一直查不出來,改成USERINFO
就可以了,這點和Mysql
的內建關鍵字差不多,也可以通過個性化設定去改。
Calcite
需要的東西:庫、表名稱、欄位名稱、欄位型別。如果資料來源使用Mysql
的話,這些都不用我們去 JAVA 服務中去定義,直接在 Mysql 使用者端建立好,這裡直接建立兩張表用於測試,就和我們的csv
檔案一樣。
CREATE TABLE `USERINFO1` (
`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
CREATE TABLE `ASSET` (
`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
上述 csv
案例中的 SchemaFactory
以及 Schema
這些都不需要建立,因為 Calcite
預設提供了 Mysql 的 Adapter介面卡。
其實,上述兩步都不需要做,我們真正要做的是,告訴 Calcite
你的 JDBC 的連線資訊就行了,也是在 model.json
檔案中定義。
{
"version": "1.0",
"defaultSchema": "Demo",
"schemas": [
{
"name": "Demo",
"type": "custom",
// 這裡是calcite預設的SchemaFactory,裡面的流程和我們上述自己定義的相同,下面會簡單看看原始碼。
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
// 我用的是mysql8以上版本,所以這裡注意包的名稱
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost:3306/irving",
"jdbcUser": "root",
"jdbcPassword": "123456"
}
}
]
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
public class TestMysql {
public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {
Properties info = new Properties();
info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
statement.executeUpdate(" insert into userinfo1 values ('xxx',12) ");
print(statement.executeQuery("select * from asset "));
print(statement.executeQuery(" select * from userinfo1 "));
print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
print(statement.executeQuery(" select * from userinfo1 where age >60 "));
print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
} finally {
connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
}
查詢結果:
上述我們在 model.json
檔案中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory
類,可以看下這個類的程式碼。
這個類是把 Factory
和 Schema
寫在了一起,其實就是呼叫schemafactory
類的create
方法建立一個 schema
出來,和我們上面自定義的流程是一樣的。
其中JdbcSchema
類也是 Schema
的子類,所以也會實現getTable
方法(這個我們上述也實現了,我們當時是獲取表結構和表的欄位型別以及名稱,是從csv檔案頭中讀檔案的),JdbcSchema
的實現是通過連線 Mysql 伺服器端查詢後設資料資訊,再將這些資訊封裝成 Calcite
需要的物件格式。
這裡同樣要注意 csv
方式的2個注意點,大小寫和關鍵字問題。
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
//會走在這裡來,這裡就是我們在model.json中指定的jdbc的連線資訊,最終會建立一個datasource
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException("Error while reading dataSource", e);
}
String jdbcCatalog = (String) operand.get("jdbcCatalog");
String jdbcSchema = (String) operand.get("jdbcSchema");
String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(
parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
SqlDialectFactory.class, sqlDialectFactory);
return JdbcSchema.create(
parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
}
}
@Override public @Nullable Table getTable(String name) {
return getTableMap(false).get(name);
}
private synchronized ImmutableMap<String, JdbcTable> getTableMap(
boolean force) {
if (force || tableMap == null) {
tableMap = computeTables();
}
return tableMap;
}
private ImmutableMap<String, JdbcTable> computeTables() {
Connection connection = null;
ResultSet resultSet = null;
try {
connection = dataSource.getConnection();
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get();
if (threadMetadata != null) {
tableDefs = threadMetadata.apply(catalog, schema);
} else {
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
// 獲取後設資料
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
//獲取庫名,表明等資訊
final String catalogName = resultSet.getString(1);
final String schemaName = resultSet.getString(2);
final String tableName = resultSet.getString(3);
final String tableTypeName = resultSet.getString(4);
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}
final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
final String tableTypeName2 =
tableDef.tableType == null
? null
: tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
final TableType tableType =
Util.enumVal(TableType.OTHER, tableTypeName2);
if (tableType == TableType.OTHER && tableTypeName2 != null) {
System.out.println("Unknown table type: " + tableTypeName2);
}
// 最終封裝成JdbcTable物件
final JdbcTable table =
new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, tableType);
builder.put(tableDef.tableName, table);
}
return builder.build();
} catch (SQLException e) {
throw new RuntimeException(
"Exception while reading tables", e);
} finally {
close(connection, null, resultSet);
}
}
OK,到這裡基本上兩個簡單的案例已經演示好了,最後補充一下整個Calcite
架構和整個 SQL 的執行流程。
整個流程如下:SQL解析(Parser)=> SQL校驗(Validator)=> SQL查詢優化(optimizer)=> SQL生成 => SQL執行
所有的 SQL 語句在執行前都需要經歷 SQL 解析器解析,解析器的工作內容就是將 SQL 中的 Token 解析成抽象語法樹,每個樹的節點都是一個 SqlNode,這個過程其實就是 Sql Text => SqlNode 的過程。
我們前面的 Demo 沒有自定義 Parser,是因為 Calcite 採用了自己預設的 Parser(SqlParserImpl)。
SqlNode
是整個解析中的核心,比如圖中你可以發現,對於每個比如select
、from
、where
關鍵字之後的內容其實都是一個SqlNode
。
parserConfig
方法主要是設定 SqlParserFactory 的引數,比如我們上面所說的我本地測試的時候踩的大小寫的坑,就可以在這裡設定。
直接呼叫setCaseSensitive=false
即不會將 SQL 語句中的表名列名轉為大寫,下面是預設的,其他的引數可以按需設定。
SQL 語句先經過 Parser,然後經過語法驗證器,注意 Parser 並不會驗證語法的正確性。
其實 Parser 只會驗證 SQL 關鍵詞的位置是否正確,我們上述2個 Parser 的例子中都沒有建立 schema
和 table
這些,但是如果這樣寫,那就會報錯,這個錯誤就是 parser
檢測後丟擲來的(ParseLocationErrorTest)。
真正的校驗在 validator
中,會去驗證查詢的表名是否存在,查詢的欄位是否存在,型別是否匹配,這個過程比較複雜,預設的 validator
是SqlValidatorImpl
。
比如關係代數,比如什麼投影、笛卡爾積這些,Calcite
提供了很多內部的優化器,也可以實現自己的優化器。
Calcite
是不包含儲存層的,所以提供一種介面卡的機制來存取外部的資料儲存或者儲存引擎。
官網裡面寫了未來會支援Kafka
介面卡到公共Api
中,到時候使用起來就和上述整合Mysql
一樣方便,但是現在還沒有支援,我這裡給大家提供個自己實現的方式,這樣就可以通過 SQL 的方式直接查詢 Kafka 中的 Topic 資料等資訊。
這裡我們內部整合實現了KSQL
的能力,查詢結果是OK的。
還是像上述步驟一樣,我們需要準備庫、表名稱、欄位名稱、欄位型別、資料來源(多出來的地方)。
Sql
解析,之前我們都沒有自定義解析,這裡需要自定義解析,是因為我需要動態解析sql
的where
條件裡面的partation
。SqlParseImpl
AST
,我們可以基於生成的SqlNode
做一些業務相關的校驗和引數解析 public class KafkaConsumerAdapter {
public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> topics = new ArrayList<>();
for (Integer partition : kafkaSql.getPartition()) {
TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
topics.add(tp);
}
consumer.assign(topics);
for (TopicPartition tp : topics) {
Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
long position = 500;
if (offsets.get(tp).longValue() > position) {
consumer.seek(tp, offsets.get(tp).longValue() - 500);
} else {
consumer.seek(tp, 0);
}
}
List<KafkaResult> results = new ArrayList<>();
boolean flag = true;
while (flag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//轉成我定義的物件集合
KafkaResult result = new KafkaResult();
result.setPartition(record.partition());
result.setOffset(record.offset());
result.setMsg(record.value());
result.setKey(record.key());
results.add(result);
}
if (!records.isEmpty()) {
flag = false;
}
}
consumer.close();
return results;
}
}
public class TestKafka {
public static void main(String[] args) throws Exception {
KafkaService kafkaService = new KafkaService();
//把解析到的引數放在我自己定義的kafkaSqlInfo物件中
KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
//介面卡獲取資料來源,主要是從上述的sqlInfo物件中去poll資料
List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
//執行查詢
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 ");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
}
private static void query(String tableName, List<KafkaResult> results,
String sql) throws Exception {
//建立model.json,設定我的SchemaFactory,設定庫名
String model = createTempJson();
//設定我的表結構,表名稱和表欄位名以及型別
KafkaTableSchema.generateSchema(tableName, results);
Properties info = new Properties();
info.setProperty("lex", Lex.JAVA.toString());
Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
Statement st = connection.createStatement();
//執行
ResultSet result = st.executeQuery(sql);
ResultSetMetaData rsmd = result.getMetaData();
List<Map<String, Object>> ret = new ArrayList<>();
while (result.next()) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
}
ret.add(map);
}
result.close();
st.close();
connection.close();
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
private static String createTempJson() throws IOException {
JSONObject object = new JSONObject();
object.put("version", "1.0");
object.put("defaultSchema", "QAKAFKA");
JSONArray array = new JSONArray();
JSONObject tmp = new JSONObject();
tmp.put("name", "QAKAFKA");
tmp.put("type", "custom");
tmp.put("factory", "kafka.KafkaSchemaFactory");
array.add(tmp);
object.put("schemas", array);
return object.toJSONString();
}
}
model.json
,之前是基於檔案,現在基於text
字串,mode=inline
模式table
裡面