王浩,葛昂,趙晴
?。ㄈA北計算機系統工程研究所,北京 100083)
摘要:為了實現將傳統關系型數據庫中的增量數據快速導入同構或者異構目的庫,在使用已有的增量提取方法的基礎上,提出了通過增加并行度和流式計算的方法加快同步速度。此方法不僅支持插入、更新和刪除的增量數據同步,而且可以抽取出數據庫表結構信息動態支持表結構變更。與傳統單點抽取方式相比,大大提高了目的庫數據的新鮮度。
0引言
隨著大數據技術的發展,越來越多的企業開始構建大數據平臺進行數據處理。然而如何將保存在關系型數據庫中的數據快速同步到大數據平臺組件(例如HBase、HDFS)中,正成為很多企業面臨的問題。Sqoop是常用的數據同步工具,其實質是MapReduce任務,延時較高,而且需要通過定時任務來達到自動化流程效果。本文在觸發器記錄數據變化的基礎上,提出了一種使用Spark Streaming將增量數據抽取出來,然后根據需要寫入到不同的目的庫的方法。由于只提取增量數據,所以較Sqoop減少了數據量。另外由于是流式處理方式,降低了延時。
1增量提取
1.1增量提取的概念
增量提取是針對上一次提取而言,將上一次提取時間點到現在數據庫中插入、更新、刪除的數據提取出來[1]。
1.2常用的增量提取方法
1.2.1基于業務系統日志
在業務中將數據庫DML(Data Manipulation Language)語句輸出以日志的方式存儲,然后通過解析日志將DML語句在目的庫中重放以達到目的。此方法需要侵入業務系統,對于已經成型的業務系統不適用。
1.2.2基于數據庫日志
解析數據庫日志也能達到增量提取的目的,但是各大數據庫廠商不對外開放數據庫系統的日志格式,這就使得解析日志變成了問題。而且各數據庫的日志格式還不盡相同,難以達到通用性。
1.2.3基于觸發器
基于觸發器的方式,目前被廣泛運用于數據庫增量提取。它通過在源表上建立插入、更新、刪除觸發器來記錄對數據的操作。每當有數據變化時,就會觸發相應的觸發器,然后運行觸發器定義的邏輯,將變化記錄到增量表。
1.3基于觸發器方法的具體實現
由于觸發器方法具有實現邏輯簡單,對業務無入侵,數據庫通用等優點,所以本文采用了基于觸發器方式的增量提取方法。具體實現方法如下:
?。?)創建名為dml_log的數據庫表,字段為id、table_name、record_id、execute_date、dml_type。其中id為自增id,table_name存儲要同步的源表表名稱,record_id是源表中發生變化的記錄的唯一標識,execute_date為觸發器執行時的時間戳,dml_type為I、U、D分別代表insert、update、delete操作。
?。?)在源表上創建插入、更新、刪除類型的觸發器。創建語句在此省略。
2構建Spark Streaming程序
2.1Spark Streaming
Spark是目前大數據處理領域比較常用的計算框架。它將中間計算結果維護在內存中,這樣不僅可以做到中間結果的重用,而且減少了磁盤IO,大大加快了計算速度。Spark Streaming是構建于Spark core之上的流式處理模塊。其原理是將流式數據切分成一個個小的片段,以mini batch的形式來處理這一小部分數據,從而模擬流式計算達到準實時的效果。
2.2JdbcRDD
彈性分布式數據集(Resilient Distributed Datasets,RDD),它是Spark數據抽象的基石。RDD是一個只讀的分區記錄集合,分區分散在各個計算節點[2]。RDD提供了transformation和action兩類操作,其中transformation是lazy級別的,主要對數據處理流程進行標記,而不立即進行運算。action操作會觸發作業的提交,然后進行回溯導致transformation操作進行運算。
JdbcRDD擴展自RDD,是RDD的子類。內部通過JDBC(Java Data Base Connectivity)操作以數據庫為源頭構建RDD。其構造函數簽名為:
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection:()=> Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow:(ResultSet) => T =
JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging {…}
2.3具體實現
Spark官方提供用于構建Spark Streaming的數據源沒有對數據庫進行支持,所以本文自己實現對數據庫的支持。編寫繼承自InputDStream類的DirectJdbcInputDStream類,其簽名為:
class DirectJdbcInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
param: JdbcParam) extends
InputDStream[Row] (ssc_) with Logging {…}
對start()、compute()和stop()方法進行重寫。
(1)在start函數中注冊JDBC驅動,用于JDBC獲取初始化信息(構造JdbcRDD時的參數);
(2)compute函數會被框架間隔指定的時間反復調用,其實質是如何返回一個JdbcRDD。首先通過JDBC獲取本次需要拉取的trigger記錄的id的上下界以及表的Schema信息;然后以這些信息為參數生成提取真實數據的SQL,其邏輯為用選中的trigger表中的記錄和原表在record_id上進行左連接;最后使用該SQL當做參數構建JdbcRDD。值得說明的是,構建JdbcRDD時是可以指定并行度的,每個worker節點都會建立到數據庫的JDBC連接,由多個節點并行去數據庫拉取屬于自己的那一部分數據,這就大大增加了提取和處理速度。
(3)在stop函數中關閉JDBC連接??傮w來看,就是在driver程序中執行的JDBC程序獲取初始化參數,在executor中執行的JDBC程序拉取真實的數據。
(4)編寫driver程序:
val sc = new SparkContext(new SparkConf)
val ssc = new StreamingContext(sc, Seconds(30))
val directStream = new DirectJdbcInputDStream[Row](ssc, jdbcParam)
directStream.foreachRDD(rdd => {
…//對數據進行處理
})
2.4限流
假設當前時間點到上次提取的時間點之間新增數據量太大,就會導致在新一次作業提交時,上一次的作業仍然沒有完成,可能會因此造成作業積壓使得系統不穩定。本文使用了基于規則的限流方法,綜合考慮集群處理能力以及間隔時間,可以配置化設置每次最大提取條數。如果當前需要提取的數據條數大于最大提取條數,則本次就只提取最大條數,剩下的延時到下次再進行提取。這樣做的好處是削減了峰流對系統造成的影響。
3測試分析
測試環境:VMware虛擬機,處理器設置為4核心,2 GB內存, 64位CentOS 6.5操作系統,Spark 1.5.1,Oracle 11g。使用4臺虛擬機搭建成Spark集群,1臺為Master,3臺為Worker。數據庫表分別設置為20、40個字段,每次最大抽取記錄數分別設置為10 000、50 000、500 000。將抽取出來的數據寫成parquet格式的文件存儲到hdfs上。測試結果如表1所示。
4結束語
本文在基于數據庫觸發器記錄數據變化的基礎上,通過自己構造DirectJdbcStream類提供Spark Streaming對數據庫的支持,達到準實時從數據庫中抽取出增量數據的目的。并且可以對抽取出來的數據進行過濾、清洗等操作,根據需求靈活地寫入到不同的目的庫。
參考文獻
?。?] 郭亮. 基于MD5與HASH的數據庫增量提取算法及其應用[D]. 長沙:湖南大學,2013.
?。?] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault tolerant abstraction for in memory cluster computing[C]. Usenix Conference on Networked Systems Design & Implementation, 2012, 70(2):141146.
[3] DEAN J, GHEMAWAT S. MapReduce: simplified dataprocessing on large clusters[C]. USENIX Association OSDI′04: 6th Symposium on Operating Systems Design and Implementation, 2004:137149.
?。?] MARTIN O. Programming in scala[M]. California: Artima Press,2010.
[5] YADAV R. Spark cookbook[M]. UK: Packt Publishing Ltd, 2015.
[6] KARAU H. Learning spark[M]. America: O’Reilly Media, Inc. 2015.
?。?] 梁剛. 企業大數據管理解決方案[J]. 微型機與應用,2013,32(24):7 10,13.