Nebula Graph介紹和SpringBoot環境連線和查詢

2022-10-12 15:00:20

Nebula Graph介紹和SpringBoot環境連線和查詢

轉載請註明來源 https://www.cnblogs.com/milton/p/16784098.html

說明

當前Nebula Graph的最新版本是3.2.1, 根據官方的檔案進行設定
https://docs.nebula-graph.io/3.2.1/14.client/4.nebula-java-client/

Nebula Graph 的一些特點

  1. 支援分散式. 相對於Neo4j, TigerGraph這些圖資料庫, Nebula 是面向分散式設計的, 因此對叢集的支援比較完備, 在規模上上限要高很多. 在實際專案中儲存了180億的點邊, 這個數量對於Neo4j和TigerGraph是比較困難的.
  2. 支援圖空間. 各個圖空間的ID是互不干擾的, 但是在同一個圖空間裡ID的型別和長度必須一致. 注意這個一致約束的是所有的點和邊. Nebula 可以使用int64作為ID, 也可以用字串, 但是字串需要指定一個長度, 例如64個位元組. 相對於只能用長整數的Neo4j, ID設計上更自由靈活.
  3. 點對應的型別叫TAG, 邊對應的型別叫EDGE
    1. TAG和EDGE都會對應一組的屬性(map, 或者說dict)
    2. 一個點可以對多個TAG, 每個TAG一組屬性, 多組屬性. 專案中建議一開始不要用多TAG, 在整個圖結構穩定後, 再做合併
    3. 一個邊只對應一個EDGE, 一組屬性
  4. Nebula 用的是自定義的查詢語法 GQL, 和 cypher 語法基本一樣
  5. 除了點邊的ID和關聯關係外, 只有帶索引的屬性可以查詢. 這點和其它圖資料庫不一樣, 其它資料庫即使沒有索引, 慢是慢點但是不報錯, Nebula直接給你返回錯誤.
  6. 對於返回數量較大的查詢, Nebula會強制查詢必須帶limit
  7. Nebula 單節點穩定性是有問題的, 在3.2.1版本中觀察到偶爾會出現服務自行退出, 如果在生產環境使用, 需要有後臺監控進行心跳檢測和自動啟動

GQL 常用查詢

下面列出一些常用的查詢

-- 列出圖空間
SHOW SPACES;

-- 列出tag(點型別)和edge(邊型別), 需要先 USE 一個圖空間
SHOW TAGS;
SHOW EDGES;

列出某一型別的點和邊

MATCH ()-[e:follow]-() RETURN e
MATCH (v:player) RETURN v

帶條件的查詢, 在結果數量較多時必須帶limit, 否則Nebula會報錯

match (v:ADDRESS)-[e]-() where id(v)==\"ADD:82388116\" return v,e limit 100

基礎設定和使用

在上面的連結中, 提供了最小的設定和測試程式碼

pom.xml 增加包依賴

對於Nebula Graph 3.2.1, 需要使用3.0.0的版本. client的每個版本只能對應特定的一兩個伺服器端版本

<dependency>
	<groupId>com.vesoft</groupId>
	<artifactId>client</artifactId>
	<version>3.0.0</version>
</dependency>

Java呼叫

Java呼叫主要是三部分, 建立連線池, 建立對談, 執行查詢

建立 NebulaPool 連線池

連線到地址127.0.0.1, 埠9669, 連線池大小100. 注意地址和埠是一個列表, Nebula是支援叢集的. 連線時不需要使用者和密碼

NebulaPool pool = new NebulaPool();
try {
	NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
	nebulaPoolConfig.setMaxConnSize(100);
	List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669));
	Boolean initResult = pool.init(addresses, nebulaPoolConfig);
	if (!initResult) {
		log.error("pool init failed.");
		return;
	}
} catch ()
//...

建立 Session 對談

建立對談時需要使用者名稱和密碼

Session session = pool.getSession("root", "nebula", false);

執行查詢

建立一個SPACE, 然後使用這個SPACE, 建立一個TAG person, 建立一個EDGE like

String createSchema = "CREATE SPACE IF NOT EXISTS test(vid_type=fixed_string(20)); "
		+ "USE test;"
		+ "CREATE TAG IF NOT EXISTS person(name string, age int);"
		+ "CREATE EDGE IF NOT EXISTS like(likeness double)";
ResultSet resp = session.execute(createSchema);
if (!resp.isSucceeded()) {
	log.error(String.format("Execute: `%s', failed: %s",
			createSchema, resp.getErrorMessage()));
	System.exit(1);
}

新增一個點記錄

String insertVertexes = "INSERT VERTEX person(name, age) VALUES "
		+ "'Bob':('Bob', 10), "
		+ "'Lily':('Lily', 9), "
		+ "'Tom':('Tom', 10), "
		+ "'Jerry':('Jerry', 13), "
		+ "'John':('John', 11);";
ResultSet resp = session.execute(insertVertexes);
if (!resp.isSucceeded()) {
	log.error(String.format("Execute: `%s', failed: %s",
			insertVertexes, resp.getErrorMessage()));
	System.exit(1);
}

查詢

String query = "GO FROM \"Bob\" OVER like "
		+ "YIELD $^.person.name, $^.person.age, like.likeness";
ResultSet resp = session.execute(query);
if (!resp.isSucceeded()) {
	log.error(String.format("Execute: `%s', failed: %s",
			query, resp.getErrorMessage()));
	System.exit(1);
}
printResult(resp);

在 SpringBoot 專案中使用 Nebula Graph

pom.xml 增加包依賴

<dependency>
	<groupId>com.vesoft</groupId>
	<artifactId>client</artifactId>
	<version>3.0.0</version>
</dependency>

Session工廠: NebulaSessionFactory.java

配合@Bean(destroyMethod = "close"), 建立一個工廠類, 接收pool並實現close()方法

public class NebulaSessionFactory {
    private final NebulaPool pool;
    private final String username;
    private final String password;

    public NebulaSessionFactory(NebulaPool pool, String username, String password) {
        this.pool = pool;
        this.username = username;
        this.password = password;
    }

    public Session getSession() {
        try {
            return pool.getSession(username, password, false);
        } catch (NotValidConnectionException|IOErrorException|AuthFailedException|ClientServerIncompatibleException e) {
            throw new RuntimeException("Nebula session exception", e);
        }
    }

    public void close() {
        pool.close();
    }
}

為什麼不直接將 NebulaPool 設定為Bean? 因為 Session 每次建立時需要帶使用者名稱密碼, 將密碼作為config注入到每個Service中肯定是大家都不願意看到的.

設定修改: application.yml

  • 這裡的值如果不打算使用profile設定, 可以直接寫入
  • hosts是逗號分隔的地址埠列表, 例如 10.22.33.33:9669,10.22.33.34:9669
myapp:
  nebula:
    hosts: @nebula.hosts@
    username: @nebula.username@
    password: @nebula.password@
    max-conn: @nebula.max-conn@

Spring啟動設定: NebulaGraphConfig.java

應用啟動時讀取設定, 建立 NebulaPool, 並範例化 NebulaSessionFactory, destroyMethod = "close", 這個表示在專案shutdown時會呼叫Bean的close方法釋放資源.

@Configuration
public class NebulaGraphConfig {

    @Value("${myapp.nebula.hosts}")
    private String hosts;
    @Value("${myapp.nebula.max-conn}")
    private int maxConn;
    @Value("${myapp.nebula.username}")
    private String username;
    @Value("${myapp.nebula.password}")
    private String password;

    @Bean(destroyMethod = "close")
    public NebulaSessionFactory nebulaSessionFactory() {
        List<HostAddress> hostAddresses = new ArrayList<>();
        String[] hostList = hosts.split(",[ ]*");
        for (String host : hostList) {
            String[] hostParts = host.split(":");
            if (hostParts.length != 2 || !hostParts[1].matches("\\d+")) {
                throw new RuntimeException("Invalid host name set for Nebula: " + host);
            }
            hostAddresses.add(new HostAddress(hostParts[0], Integer.parseInt(hostParts[1])));
        }
        NebulaPoolConfig poolConfig = new NebulaPoolConfig();
        poolConfig.setMaxConnSize(maxConn);
        NebulaPool pool = new NebulaPool();
        try {
            pool.init(hostAddresses, poolConfig);
        } catch (UnknownHostException e) {
            throw new RuntimeException("Unknown Nebula hosts");
        }
        return new NebulaSessionFactory(pool, username, password);
    }
}

Service呼叫

在 Service 中進行呼叫

@Service
@Slf4j
public class GraphServiceImpl implements GraphService {

    @Autowired
    private NebulaSessionFactory sessionFactory;

    @Override
    public <T> NebulaResult<T> query(String graphSpace, String gql) {
        Session session = null;
        try {
            log.info("GQL: {}", gql);
            session = sessionFactory.getSession();
            NebulaResult<Void> res = query(session, "USE " + graphSpace);
            if (!res.isSuccess() || res.getResults() == null || res.getResults().size() == 0) {
                log.error("Failed to use space:{}", graphSpace);
                return null;
            }
            if (!graphSpace.equals(res.getResults().get(0).getSpaceName())) {
                log.error("Failed to use space:{}, result:{}", graphSpace, res.getResults().get(0).getSpaceName());
                return null;
            }
            return query(session, gql);
        } catch (IOErrorException e) {
            log.error(e.getMessage(), e);
            return null;
        } finally {
            if (session != null) {
                session.release();
            }
        }
    }

    private <T> NebulaResult<T> query(Session session, String gql) throws IOErrorException {
        String json = session.executeJson(gql);
        return JacksonUtil.extractByType(json, new TypeReference<>() {});
    }
}

輔助類 NebulaResult.java 等

外層結構

這裡定義了 json 格式響應的外層結構

@Data
public class NebulaResult<T> implements Serializable {
    private List<Error> errors;
    private List<Result<T>> results;

    @JsonIgnore
    public boolean isSuccess() {
        return (errors != null && errors.size() == 1 && errors.get(0).getCode() == 0);
    }

    @Data
    public static class Error implements Serializable {
        private int code;
    }

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    @JsonInclude(JsonInclude.Include.NON_NULL)
    public static class Result<T> implements Serializable {
        private String spaceName;
        private List<Element<T>> data;
        private List<String> columns;
        private Error errors;
        private long latencyInUs;
    }

    @Data
    public static class Element<T> implements Serializable {
        private List<Meta<T>> meta;
        private List<Serializable> row;
    }

    @Data
    public static class Meta<T> implements Serializable {
        private String type;
        private T id;
    }
}

內層因為區分Edge和Vertex, 結構不一樣. 如果是混合返回的結果, 可以用 Serializable

String gql = "match (v:ADDR)-[e]-() where id(v)==\"ADD:123123\" return v,e limit 100";
        NebulaResult<Serializable> res = graphService.query("insurance", gql);
        log.info(JacksonUtil.compress(res));
        Assertions.assertThat(res).isNotNull();

對於邊, 需要使用結構化的ID

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EdgeId implements Serializable {
    private int ranking;
    private int type;
    private String dst;
    private String src;
    private String name;
}

用這個結構進行查詢

NebulaResult<EdgeId> res3 = graphService.query("t_test1", "MATCH ()-[e:follow]-() RETURN e");

對於點, ID就是String

NebulaResult<String> res2 = graphService.query("t_test1", "MATCH (v:player) RETURN v");