「持續(xù)更新中,歡迎關(guān)注...」
1. 需求:
作為一家專注于三維高精度地圖服務(wù)的公司,有海量(PB級(jí))的原始數(shù)據(jù)、中間數(shù)據(jù)、成功數(shù)據(jù),需要存儲(chǔ)、管理、并定期歸檔。
按項(xiàng)目管理數(shù)據(jù),數(shù)據(jù)分類航飛數(shù)據(jù)、控制點(diǎn)數(shù)據(jù)、中間數(shù)據(jù)、成果數(shù)據(jù)、其他數(shù)據(jù)。數(shù)據(jù)來(lái)源包括無(wú)人機(jī)數(shù)據(jù)、載荷數(shù)據(jù)、地面站數(shù)據(jù)、人工打點(diǎn)數(shù)據(jù)等。不同渠道匯集而來(lái)的數(shù)據(jù)。采用類似百度網(wǎng)盤的形式,上傳、下載,支持?jǐn)帱c(diǎn)續(xù)傳、進(jìn)度跟蹤。支持細(xì)化到文件級(jí)別的權(quán)限控制,以及更多的文件(夾)屬性。2. 分析:系統(tǒng)重點(diǎn)在于數(shù)據(jù)存儲(chǔ)的選型,支持海量數(shù)據(jù)的存儲(chǔ),能夠支持在復(fù)雜網(wǎng)絡(luò)下的數(shù)據(jù)上傳。選用CEPH作為數(shù)據(jù)存儲(chǔ),RGW對(duì)象存儲(chǔ),S3協(xié)議上傳下載,完美支持分片和斷點(diǎn)續(xù)傳。系統(tǒng)難點(diǎn)在于文件級(jí)別的業(yè)務(wù)權(quán)限控制,以及文件(夾)更多的屬性支持。CEPH RGW本身支持權(quán)限控制海量數(shù)據(jù)管理――文檔和圖像的壓縮和索引,但是無(wú)法和業(yè)務(wù)權(quán)限做對(duì)接。對(duì)象存儲(chǔ)本身沒(méi)有文件夾的概念,無(wú)法對(duì)文件夾做分類、數(shù)量展示、大小展示。所以實(shí)現(xiàn)自定義索引服務(wù),CEPH主要負(fù)責(zé)存儲(chǔ),自定義索引服務(wù)實(shí)現(xiàn)展示與查詢。由于上傳下載會(huì)經(jīng)過(guò),選擇通過(guò)lua腳本,收集流經(jīng)的文件信息,經(jīng)由kafka轉(zhuǎn)發(fā)到業(yè)務(wù)服務(wù)中進(jìn)行業(yè)務(wù)處理應(yīng)用。3. 實(shí)現(xiàn)
系統(tǒng)重點(diǎn)在于海量數(shù)據(jù)上傳的可靠性與海量數(shù)據(jù)索引的管理。
3.1 架構(gòu)
上傳助手就是類百度網(wǎng)盤的桌面端軟件海量數(shù)據(jù)管理――文檔和圖像的壓縮和索引,采用 JS實(shí)現(xiàn)。主要實(shí)現(xiàn)功能:項(xiàng)目展示、上傳、下載。業(yè)務(wù)層包括網(wǎng)關(guān)服務(wù)、賬號(hào)服務(wù)、項(xiàng)目服務(wù)、文件索引服務(wù)等。采用Java + Boot + Cloud技術(shù)棧。其中重點(diǎn)服務(wù)是文件索引服務(wù)Index ,負(fù)責(zé)海量文件的索引維護(hù)和查詢。業(yè)務(wù)數(shù)據(jù)MySQL集群+Redis集群,海量文件存儲(chǔ)使用CEPH對(duì)象存儲(chǔ),支持S3 API。3.3 關(guān)鍵流程圖
上傳助手使用普通的Put 請(qǐng)求上傳文件,加上自定義的字段(項(xiàng)目ID、用戶ID等)即可完成數(shù)據(jù)的提交。使用proxy模式將文件請(qǐng)求轉(zhuǎn)發(fā)到 CEPH RGW,由RGW完成后臺(tái)數(shù)據(jù)存儲(chǔ)處理。在RGW完成數(shù)據(jù)存儲(chǔ)以后,調(diào)用將對(duì)應(yīng)請(qǐng)求的用戶自定義和文件屬性轉(zhuǎn)發(fā)到后臺(tái)Kafka。文件索引服務(wù)(Index )從Kafka中消費(fèi)任務(wù),拿到每個(gè)文件的信息。文件索引服務(wù)(Index )對(duì)文件數(shù)據(jù)按業(yè)務(wù)要求進(jìn)行處理后,存入MySQL數(shù)據(jù)庫(kù)。3.4 示例代碼
.lua:從獲取文件信息,并發(fā)往Kafka
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "172.16.0.20", port = 9092 },
}
function send_job_to_kafka()
local log_json = {}
local req_headers_ = ngx.req.get_headers()
for k, v in pairs(req_headers_) do
if k == "content-length" then
log_json["contentLength"] = tostring(v)
end
if k == "u-id" then
log_json["uId"] = tostring(v)
end
if k == "p-id" then
log_json["pId"] = tostring(v)
end
end
local resp_headers_ = ngx.resp.get_headers()
for k, v in pairs(resp_headers_) do
if k == "etag" then
log_json["etag"] = string.gsub(v, "\"", "")
break
end
end
log_json["uri"] = ngx.var.uri
log_json["host"] = ngx.var.host
log_json["remoteAddr"] = ngx.var.remote_addr
log_json["status"] = ngx.var.status
local message = cjson.encode(log_json);
ngx.log(ngx.ERR, "message is[", message, "]")
return message
end
--local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status
-- 過(guò)濾Put Object成功的請(qǐng)求,記錄相應(yīng)的metadata及請(qǐng)求ID,并轉(zhuǎn)發(fā)到kafka
if request_method == "PUT" and status_code == "200" then
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka())
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
ngx.log(ngx.ERR, "kafka send success:", ok)
end
4. 總結(jié)通過(guò)此架構(gòu)方案,在海量文件歸檔過(guò)程中,將文件基本信息異步導(dǎo)入到業(yè)務(wù)數(shù)據(jù)庫(kù)中,便于業(yè)務(wù)應(yīng)用開發(fā)。此架構(gòu)一般也應(yīng)用對(duì)象存儲(chǔ)的多媒體文件處理,比如圖片處理、視頻處理、加水印、鑒黃、事件通知等。