RE: 從零開始的 Data Pipeline(一) — Data Collector

**哈囉,大家今天過得好嗎?**在 Data Pipeline 這系列的上一篇文章中已經介紹過了整個 Data Pipeline 大概長什麼樣子?會有哪些需要用到的東西?如果還不熟悉的可以先去看一下

RE: 從零開始的 Data Pipeline — 序章

這篇將會專門介紹 Data Collector 這個角色。


什麼是 Data Collector

在 Data Pipeline 中勢必會有在不同服務間傳送資料的東西存在。但如果每個環節都要從頭寫的話,不僅是費時費力,更是會變得混亂難管理,相信任何人都不會想要接手這種東西,因此有了 Data Collector 的出現,最常見的有兩個:FluentdLogstash

兩者間主要的不同點在於 Fluentd 需要依賴 Ruby 跟 Gem,也可以用 td-agent 來建立獨立的 Ruby 環境。而 Logstash 則是需要 Java 環境。


Data Collector 可以做什麼

依照我們的應用情境來說,我們會在幾個不同的環節中使用到

  1. Application server:追蹤服務產生的 Log 並送到 Google Pub/Sub(GCP 上類似 Apache Kafka 的服務)
  2. 從 Google Pub/Sub 抓資料下來做點處理,然後丟上 Google Cloud Storage 儲存

由以上兩點可以發現,在傳送、處理、接收資料時會用到 Data Collector,有了這個就可以大幅度簡化整體架構的複雜度。

此外 Fluentd 可以很容易的串連,可以透過內建的 forward 方法在不同的 Fluentd 之間傳遞,也可以透過上述的 Google Pub/Sub 做為資料中繼站。


安裝 Fluentd

雖然不免俗地寫了這個章節,但我覺得照著官方的 安裝指南 實在很簡單,所以這邊就不多贅述。

值得一提的地方在於,在非 container 的環境中,我建議使用 td-agent來啟動 Fluentd 服務。這是由 Treasure Data 提供的一個在 Fluentd 之外包了一層管理工具的版本,具有獨立的 Ruby 環境且較容易管理 Fluentd 的服務(包含自動重啟服務、log rotation 等),可以省去大量的麻煩。


你的第一個 Fluentd 設定檔

Fluentd 的設定檔中會有幾個環節

  1. <source> 這邊設定了 input 端,常見的有 tail、forward 等
  2. <filter> 對資料的過濾、處理等步驟(非必要)
  3. <match> 這邊是定義資料的 output 端

貫串了整個設定檔的東西是 tag,Fluentd 會透過 tag 來把每筆資料分配到他該去的地方。接著我們就來動手寫一個會去追蹤 nginx log 的設定檔。

<source>
  @type tail
  path /fluentd/nginx/access.log
  pos_file /fluentd/log/nginx-access.log.pos
  tag nginx.access

  <parse>
    @type nginx
    keep_time_key true
  </parse>
</source>

<match nginx.access>
  @type stdout
</match>

Input

type 每當指定的檔案有新的一行資料進去時,Fluentd 就會抓取並執行後續動作。

path 指定追蹤 /fluentd/nginx/access.log 這檔案

pos_file 而目前處理到哪一行則會儲存在 /fluentd/log/nginx-access.log.pos 這檔案中,這樣 Fluentd 重新啟動就可以從上次的位置繼續執行。

tag 這設定會讓從這檔案而來的所有資料都打上nginx.access這個標籤,後續的應用都會認這個標籤。

Output

<match nginx.access> 這代表只會針對有 nginx.access 這標籤的資料執行,而 @type stdout 則是只輸出結果而已

實際應用

設定檔寫完了,接下來範例我會使用 docker 來執行,來看看結果如何。知道如何使用的朋友可以跟著執行看看

# 建立存放 nginx log 的目錄
mkdir nginx

# 啟動 nginx 服務(log 存至 nginx/)
docker run -v $(PWD)/nginx:/var/log/nginx/ -d -p 8080:80 --name nginx-server nginx

# 啟動 Fluentd 服務(抓取 nginx/ 的資料)
docker run -v $(PWD)/nginx:/fluentd/nginx --rm --name fluentd lukehong/nginx-fluentd-example:basic

執行完以上兩個指令後,應該就會有一個背景執行的 nginx container,以及畫面中應該會出現一些 log 說明著 Fluentd 已經開始執行。接著用 browser 進入 http://localhost:8080/ 之後,應該就能看到幾行結果

2019-01-19 15:18:26.000000000 +0000 nginx.access: {"remote":"172.17.0.1","host":"-","user":"-","time":"19/Jan/2019:15:18:26 +0000","method":"GET","path":"/","code":"304","size":"0","referer":"-","agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36","http_x_forwarded_for":"-"}

2019-01-19 15:18:34.000000000 +0000 nginx.access: {"remote":"172.17.0.1","host":"-","user":"-","time":"19/Jan/2019:15:18:34 +0000","method":"GET","path":"/","code":"304","size":"0","referer":"-","agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36","http_x_forwarded_for":"-"}

看到這些就代表成功囉。


進階設定

基本的成功了,那麼接下來我們就要開始進階版的,以下是使用情境

  1. 有多台 application server
  2. 有一台獨立的 Fluentd aggregator

在 1 上會有各自的 Fluentd forwarder 將資料送到 2 整合並且送到 Google Cloud Storage 儲存。這就會需要用到 Fluentd 內建的 in_forwardout_forward 來傳送與接收資料。

這邊會只使用一組 application server 來模擬。

Fluentd forwarder

fluentd-forwarder 的作用一樣是去追蹤 nginx log,但是不再是直接輸出結果,而是透過 forward 的方式送至 fluentd-aggregator ,因此在每一台 application server 上都會有一個 fluentd-forwarder

# fluent.conf
<source>
  @type tail
  tag nginx.access
  ...
</source>

<source>
  @type tail
  tag nginx.error
  ...
</source>

<match nginx.*>
  @type forward
  ...

<server>
    name aggregator
    host fluentd-aggregator
    port 24224
    weight 60
  </server>
</match>

除了原有 nginx.access 這邊標籤的 log 之外,我另外再加上了 nginx.error 這個標籤的 log,讓大家了解標籤是如何運作的。

<match nginx.*> 這用法就是只要主標籤是 nginx 的會成立,但如果想要讓所有以 nginx 為開頭的標籤時,則要使用 <match nginx**>

而在 <server> 中的 host 可以是 IP 或是 hostname等,在我的範例中是使用 docker container link 指定的 hostname。

Fluentd aggregator

fluentd-aggregator 的作用就是收集彙整所有 log,並且 access log 丟上 Google Cloud Storage(GCS)、error log 直接顯示出來。

這篇不會介紹 GCS 的使用方法,想要跟著做的人需要事先準備好可用的 GCP service account keyfile

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<match nginx.error>
  @type stdout
</match>

<match nginx.access>
  @type gcs

project [YOUR_PROJECT]
  keyfile /fluentd/etc/[YOUR_KEYFILE_NAME].json
  bucket [YOUR_BUCKET_NAME]
  ...
</match>

Fluentd 的 forward 功能本身就會帶著標籤,因此在這邊的 <source> 中並不需要設定 tag

我這邊特別設計成遇到 nginx.error 這標籤時,就是直接使用 stdout 顯示出來。而遇到 nginx.access 時才會透過 Fluentd 的 fluent-plugin-gcs 這個 plugin 將資料送到 GCS 上面儲存。

要自己實做的人需要將 LukeHong/fluentd-example clone 下來,修改完 advance/aggregator/下的 Dockerfilefluent.conf這兩支檔案中的 [YOUR_PROJECT]``[YOUR_KEYFILE_NAME]``[YOUR_BUCKET_NAME] 這三個填上正確的內容,再 build fluentd-aggregator 的 image。

docker build -t fluentd-aggregator advance/aggregator/

實作應用

# 啟動 nginxdocker run -v $(PWD)/nginx:/var/log/nginx/ -d -p 8080:80 --name nginx-server nginx

# 啟動 aggregator
docker run --name fluentd-aggregator --rm fluentd-aggregator

# 啟動 fluentd-forwarder
docker run --link fluentd-aggregator -v $(PWD)/nginx:/fluentd/nginx -d --rm --name fluentd-forwarder lukehong/nginx-fluentd-example:adv-forwarder

執行完這三個指令後,分別會啟動 nginx-server``fluentd-forwarder``flutentd-aggregator 這三個 container,需要注意的地方是使用了 --link fluentd-aggregator 才能在 fluentd-forwarder中使用 hostname 連到它。

建立完 container 後一樣先到 http://localhost:8080 產生幾筆 access log,再到不存在的頁面(如 http://localhost:8080/error)隨意產生幾筆 error log。

過一段時間後,就能發現 GCS 上確實有資料寫進去了。

$ gsutil ls gs://fluentd-example/logs/2019/01/22/
gs://fluentd-example/logs/2019/01/22/201901220754_0.gz
gs://fluentd-example/logs/2019/01/22/201901220756_0.gz
gs://fluentd-example/logs/2019/01/22/201901220802_0.gz

而且我們隨便亂打路徑產生的 error 也有顯示在 terminal 中。

# fluentd-aggregator stdout
2019-01-22 08:06:41.000000000 +0000 nginx.error: {"time":"2019/01/22 08:06:41","log_level":"error","pid":"6","tid":"6","message":"*13 open() \"/usr/share/nginx/html/error\" failed (2: No such file or directory)","client":"172.17.0.1","server":"localhost","request":"\"GET /error HTTP/1.1\"","host":"\"localhost:8080\""}

2019-01-22 08:06:41.000000000 +0000 nginx.error: {"time":"2019/01/22 08:06:41","log_level":"error","pid":"6","tid":"6","message":"*13 open() \"/usr/share/nginx/html/error\" failed (2: No such file or directory)","client":"172.17.0.1","server":"localhost","request":"\"GET /error HTTP/1.1\"","host":"\"localhost:8080\""}

這麼一來我們進階設定的實驗也成功了。


結語

在這個資料如同石油的年代,如何建立穩定、可靠、容易管理的 Data Pipeline 是一件非常重要的事情,尤其是在架構日趨複雜的時候能使用統一的 Data Collector 就會幫我們省下大量的時間與心力。

希望這篇文章能對你有幫助,如果有什麼疑問、想知道什麼相關的東西等等的歡迎在底下留言。

(刪除線)如果喜歡這篇文章的話歡迎幫我按個「拍手」,想持續看到更多文章也可以按下「追隨」喔。(刪除線)


相關連結

程式在 GitHub LukeHong/fluentd-example

Image 在 Docker hub lukehong/nginx-fluentd-example

Posts in this series