最系統掌握Flink CDC系列之實時抽取Oracle資料(排雷和調優實踐)

2022-01-18 19:00:28
本篇文章給大家帶來了對 Oracle 的實時資料捕獲以及效能調優,將試用過程中的一些關鍵細節進行分享,希望對大家有幫助。

Flink CDC 於 2021 年 11 月 15 日釋出了最新版本 2.1,該版本通過引入內建 Debezium 元件,增加了對 Oracle 的支援。筆者第一時間下載了該版本進行試用併成功實現了對 Oracle 的實時資料捕獲以及效能調優,現將試用過程中的一些關鍵細節進行分享。

試用環境:

Oracle:11.2.0.4.0(RAC 部署)

Flink:1.13.1

Hadoop:3.2.1

通過 Flink on Yarn 方式部署使用

一、無法連線資料庫

根據官方檔案說明,在 Flink SQL CLI 中輸入以下語句:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );

之後嘗試通過 select * from TEST 觀察,發現無法正常連線 Oracle,報錯如下:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

從報錯資訊來看,可能是由於 Flink CDC 誤將連線資訊中提供的 MY_SERVICE_NAME (Oracle 的服務名) 錯認為 SID。於是嘗試閱讀 Flink CDC 涉及到 Oracle Connector 的原始碼,發現在 com.ververica.cdc.connectors.oracle.OracleValidator 中,對於 Oracle 連線的程式碼如下:

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}

由上可以看出,在當前版本的 Flink CDC 中,對於 SID 和 Service Name 的連線方式並未做區分,而是直接在程式碼中寫死了 SID 的連線方式 (即 port 和 dbname 中間使用 「 : 」 分隔開)。

從 Oracle 8i 開始,Oracle 已經引入了 Service Name 的概念以支援資料庫的叢集 (RAC) 部署,一個 Service Name 可作為一個資料庫的邏輯概念,統一對該資料庫不同的 SID 範例的連線。據此,可以考慮以下兩種方式:

在 Flink CDC 的 create table 語句中,將 database-name 由 Service Name 替換成其中一個 SID。該方式能解決連線問題,但無法適應主流的 Oracle 叢集部署的真實場景;

對該原始碼進行修改。具體可在新建工程中,重寫 com.ververica.cdc.connectors.oracle.OracleValidator 方法,修改為 Service Name 的連線方式 (即 port 和 dbname 中間使用 「 / 」 分隔開),即:

"jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname, userName, userpwd);

筆者採用的就是第二種方法,實現了正常連線資料庫的同時,保留對 Oracle Service Name 特性的使用。

二、無法找到 Oracle 表

按照上述步驟,再次通過 select * from TEST 觀察,發現依然無法正常獲取資料,報錯如下:

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

觀察到錯誤紀錄檔中提到的表是 MY_SERVICE_NAME.MY_SCHEMA.test,為什麼資料庫名、Schema 名都是大寫,而表名是小寫?

注意到該錯誤由 io.debezium 包報出,通過分析該包的原始碼 (通過 Flink CDC 的 pom.xml 檔案可知,目前使用的是 debezium 1.5.4 版本) 可知,在 io.debezium.relational.Tables 中有如下程式碼:

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

可見,Debezium 的開發者將 「大小寫不敏感」 統一定義為了 「需要將表名轉換為小寫」。對於 Debezium 支援的 PostgreSQL、Mysql 等確實如此。然而對於 Oracle 資料庫,「大小寫不敏感」 卻意味著在內部元資訊儲存時,需要將表名轉換為大寫

因而 Debezium 在讀取到 「大小寫不敏感」 的設定後,按照上述程式碼邏輯,只會因為嘗試去讀取小寫的表名而報錯。

由於 Debezium 直到目前最新的穩定版本 1.7.1,以及最新的開發版本 1.8.0 都未修復該問題,我們可以通過以下兩種方法繞過該問題:

如需使用 Oracle 「大小寫不敏感」 的特性,可直接修改原始碼,將上述 toLowercase 修改為 toUppercase (這也是筆者選擇的方法);

如果不願意修改原始碼,且無需使用 Oracle 「大小寫不敏感」 的特性,可以在 create 語句中加上 'debezium.database.tablename.case.insensitive'='false',如下範例:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
'schema-name'='MY_SCHEMA',
'table-name'='TEST',
'debezium.database.tablename.case.insensitive'='false' );

該方法的弊端是喪失了 Oracle 「大小寫不敏感」 的特性,在 'table-name' 中必須顯式指定大寫的表名。

需要註明的是,對於 database.tablename.case.insensitive 引數,Debezium 目前僅對 Oracle 11g 預設設定為 true,對其餘 Oracle 版本均預設設定為 false。所以讀者如果使用的不是 Oracle 11g 版本,可無需修改該引數,但仍需顯式指定大寫的表名。

三、資料延遲較大

資料延遲較大,有時需要 3-5 分鐘才能捕捉到資料變化。對於該問題,在 Flink CDC FAQ 中已給出了明確的解決方案:在 create 語句中加上如下兩個設定項:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

那麼為什麼要這樣做呢?我們依然可以通過分析原始碼和紀錄檔,結合 Oracle Logminer 的工作原理來加深對工具的理解。

對 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法進行。為節約篇幅,本文不列出實際的原始碼,僅提煉出關鍵過程繪於下面的流程圖,有興趣的讀者可以對照該流程圖,結合實際原始碼進行分析:

02.png

採用 redo_log_catalog 的方式,可以監控資料表的 DDL 資訊,且由於 archive logs 被永久儲存到磁碟上,可以在資料庫宕機後依然正常獲取到宕機前的所有 DDL 和 DML 操作。但由於涉及到比 online catalog 更多的資訊監控,以及由此帶來的頻繁的紀錄檔切換和紀錄檔轉儲操作,其代價也是驚人的。

根據筆者實際測試情況,如果 debezium.log.mining.strategy 為預設設定 redo_log_catalog,則不僅需要多執行第 ① 步操作 (該操作耗時約為半分鐘到 1 分鐘之間),在第 ④ 步,根據 archived logs 的資料量,耗時也會在 1 分鐘到 4 分鐘之間浮動;在第 ⑤ 步,實際查詢 V$LOGMNR_CONTENTS 檢視也常常需要十幾秒才能完成。

此外,由於 archive logs 在實際系統中增長速度較快,因此在實際使用中,常會配合進行定期刪除或轉儲過期紀錄檔的操作。由於上述第 ④ 步的耗時較長,筆者觀察到在第 ④ 步執行的過程中,在一定概率下會發生第 ② 步加入的a rchive logs 已過期而被刪除轉儲的情況,於是在第 ⑤ 步查詢的時候,會由於找不到第 ② 步加入的紀錄檔,而報下面的錯誤:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status

一般來說,Flink CDC 所需要監控的表,特別是對於業務系統有重大意義的表,一般不會進行 DDL 操作,僅需要捕捉 DML 操作即可,且對於資料庫宕機等極特殊情況,也可使用在資料庫恢復後進行全量資料更新的方式保障資料的一致性。因而,online_catalog 的方式足以滿足我們的需要。

另外,無論使用 online_catalog,還是預設的 redo_log_catalog,都會存在第 ② 步找到的紀錄檔和第 ⑤ 步實際需要的紀錄檔不同步的問題,因此,加入 'debezium.log.mining.continuous.mine'='true' 引數,將實時蒐集紀錄檔的工作交給 Oracle 自動完成,即可規避這一問題。

筆者按照這兩個引數設定後,資料延遲一般可以從數分鐘降至 5 秒鐘左右。

四、調節引數繼續降低資料延遲

上述流程圖的第 ③ 步和第 ⑦ 步,提到了根據設定項來確定 LogMiner 監控時序範圍,以及確定休眠時間。下面對該過程進行進一步分析,並對單個表的進一步調優給出一般性的方法論。

通過觀察 io.debezium.connector.oracle.logminer.LogMinerHelper 類中的 getEndScn 方法,可瞭解到 debezium 對監控時序範圍和休眠時間的調節原理。為便於讀者理解,將該方法用流程圖說明如下:

03.png

從上述的流程圖中可以看出,debezium 給出 log.mining.batch.size.* 和 log.mining.sleep.time.* 兩組引數,就是為了讓每一次 logMiner 執行的步長能夠儘可能和資料庫自身 SCN 增加的步長一致。由此可見:

log.mining.batch.size.* 和 log.mining.sleep.time.* 引數的設定,和資料庫整體的表現有關,和單個表的資料變化情況無關;

log.mining.batch.size.default 不僅僅是監控時序範圍的起始值,還是監控時序範圍變化的閾值。所以如果要實現更靈活的監控時序範圍調整,可考慮適當減小該引數;

由於每一次確定監控時序範圍時,都會根據 topScn 和 currentScn 的大小來調整 sleepTime,所以為了實現休眠時間更靈活的調整,可考慮適當增大 log.mining.sleep.time.increment.ms;

log.mining.batch.size.max 不能過小,否則會有監控時序範圍永遠無法追上資料庫當前 SCN 的風險。為此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下邏輯:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}

如果當前的監控時序範圍達到了 log.mining.batch.size.max,那麼 debezium 會在紀錄檔中給出如上提示。在實際應用中,觀察 Flink CDC 產生的 log 是否包含該提示,便可得知 log.mining.batch.size.max 的值是否合理。

五、Debezium Oracle Connector 的隱藏引數

事實上從上文中我們已經瞭解到了兩個隱藏引數:debezium.database.tablename.case.insensitive (見第二節內容) 和 debezium.log.mining.continuous.mine (見第三節內容),這兩個引數在 Debezium 的官方檔案中均未給出實際說明,但實際上可以使用。通過分析原始碼,現給出 Debezium Oracle Connector 的所有隱藏引數,以及其說明如下:

04.png

筆者認為除了上面我們已經用到的兩個引數以外,同樣值得重點關注的是 log.mining.history.recorder.class 引數。由於該引數目前預設為 io.debezium.connector.oracle.logminer.NeverHistoryRecorder,是一個空類,所以我們在分析 Flink CDC 行為時,通過自定義實現 io.debezium.connector.oracle.logminer.HistoryRecorder 介面的類,可在不修改原始碼的情況下,實現對 Flink CDC 行為的個性化監控。

以上就是最系統掌握Flink CDC系列之實時抽取Oracle資料(排雷和調優實踐)的詳細內容,更多請關注TW511.COM其它相關文章!