Asynq: Golang distributed task queue library
最近開發一個 go 專案, 有異步任務的需求, 在 python 上大家通常會使用 celery, golang 常見的則是 machinery, 不過今天要跟各位介紹的套件為 Asynq, Github link, backend 使用 redis, 官網上宣稱其為簡單、可依賴、高效的 go 分散式佇列任務。
作者 Ken Hibino, 任職於 Google, 文檔寫得很完整, 實際用下來真的是挺棒的, 推薦給大家。
官網上列出的幾項特點, 挑一些我覺得重要的點放上來:
- 任務排程
- 任務寫進 redis 後會持久化
- 任務執行失敗會自動 retry
- 任務權重優先制
- 可使用 unique-option 來避免任務重複執行
- 任務可以設定執行時間或是最久可執行的時間
- 定時發送任務
- 支援 redis cluster 及 redis sentinal 來達到高可用性
- 作者提供 Wen UI & CLI tool 讓大家查看任務執行狀況
個人覺得做異步任務很重要的幾點, Asynq 都有達到需求:
- 減輕 API server 的負擔, 透過把不及時的任務指派給相對應的 worker 在背景處理, 並且可以監控 worker 是否有異常情形。
- 當業務量變大時, 可以輕鬆的增加 worker 數量來做水平擴展。
- 定時發送任務。
以下就開始介紹如何使用吧!
範例 code 都放在這個 github 上, 想在本地上跑的可以直接拿來跑
首先要在你的 project(版本需 go 1.13以上) 上安裝 asynq library
go get -u github.com/hibiken/asynq
確保你的 redis 已經正確運行, 即可以開始封裝 task creation & task handling
package tasksimport (
"context"
"fmt"
"time"
"github.com/hibiken/asynq"
)const (
// TypeHeartBeat is a name of the task type
TypeHeartBeat = "heartbeat"
)//----------------------------------------------
// 建立一個 task
// task 須包含 type & payload, 並被封裝在 asynq.Task struct 中
//----------------------------------------------// HeartBeatTask payload.
func HeartBeatTask(id int) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
} // Return a new task with given type and payload.
return asynq.NewTask(TypeHeartBeat, payload)
}
//---------------------------------------------------------------
// 建立 handle task 來處理進來的 input
// 注意須符合 asynq.HandlerFunc 的 interface
// 當 task 被觸發時, 由 handler 來處理我們的商業邏輯
//---------------------------------------------------------------// HandlHeartBeatTask handler.
func HandlHeartBeatTask(c context.Context, t *asynq.Task) error {
// Get user ID from given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
time := time.Now()
fmt.Printf("heart beat user_id %d, now is %v\n", id, time)
// Do something you want
... return nil
}
程式裡可使用 asynq.RedisClientOpt 來連接 redis
redisclient = asynq.RedisClientOpt{
Addr: "localhost:6379",
DB: 1,
}
Comsumer Worker
WorkerConsumer function 這裡會使用建立一個 worker 來處理由 client 或是自己server 發出的定時任務, 以這個例子來看, 這個 worker 會接收 critical, default & low 三個 queue的任務, 根據 task type 來執行相對應的 handler function
當你的服務需求變複雜, 流量增大時, 可以輕鬆的啟動多台 workers 來增加 throughput, 也可以自定義 queue 來讓特定的任務送到指定的 queue 中, Asynq 給予很大的彈性讓我們來客製化
// main.gopackage main// WorkerConsumer function
func WorkerConsumer() {
worker := asynq.NewServer(redisclient, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
})
mux := asynq.NewServeMux()
// Define a task handler for the welcome email task.
mux.HandleFunc(
tasks.TypeHeartBeat, // task type
tasks.HandlHeartBeatTask, // handler function
) // Run worker server.
if err := worker.Run(mux); err != nil {
log.Fatal(err)
}
}
Client
這邊為模擬用戶發送異步任務, HeartBeatTask function 為返回一個asynq 的task, 並帶上 type nema 及 payload, 把 task1, task2 分別丟進 critical 跟 low 的 queue 中, 在前面定義好的 worker 就會經由 redis 接到相對應得任務而開始執行了! (tasks2 多了一個參數, 在此為延後10秒後執行)
func ClientProducer() {
client := asynq.NewClient(redisclient)
for id := 1; id < 5; id++ {
task1 := tasks.HeartBeatTask(id)
task2 := tasks.HeartBeatTask(id * 100)
// Process the task immediately in critical queue.
if _, err := client.Enqueue(
task1, // task payload
asynq.Queue("critical"), // set queue for task
); err != nil {
log.Fatal(err)
}
delay := 10 * time.Second
if _, err := client.Enqueue(
task2, // task payload
asynq.Queue("low"), // set queue for task
asynq.ProcessIn(delay), // set time to process task
); err != nil {
log.Fatal(err)
}
}
}
Period Task
不少人都會有需要發定時任務的需求, 在每天固定時間執行某些功能或是美多久時間做一個任務等等的, 在 asynq 中, 定義好 task 後, 使用 schedile.Register 即可以做到定時發任務的效果了, 簡單又方便, 不過定時任務最小單位為分鐘, 如果想要精準到秒的話只能夠只用 every 這關鍵字了, 可參考下方範例:
// BeatProducer function
func BeatProducer() {
scheduler := asynq.NewScheduler(
redisclient,
&asynq.SchedulerOpts{},
)
task1 := tasks.HeartBeatTask(1)
task2 := tasks.HeartBeatTask(2) // You can use cron spec string to specify the schedule.
// The cron parameter is (minute, hour, day, month, day of week)
entryID, err := scheduler.Register("* * * * *", task1, asynq.Queue("critical"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an every minute entry: %q\n", entryID) // You can use "@every <duration>" to specify the interval.
entryID, err = scheduler.Register("@every 60s", task2, asynq.Queue("default"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an every 60s entry: %q\n", entryID)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
重要的組件都介紹到了, 實際跑起來看看效果如何吧, 要起3個服務(worker, client & period task server)
分別在不同終端機上執行:
TYPE=worker go run main.go
TYPE=client go run main.go
TYPE=beat go run main.go
結果:
client 執行時, 會直接送出 id 為 1~4 的 4 個任務, 並送出 id 100, 200, 300, 400 任務且延後10秒在執行, 由圖一可看到前4筆 log 時間與 client 啟動時一致, 10秒後印出了另外的4個 tasks。
beat 則為定時任務, 定義了兩個, 其中 id=1 為每分鐘執行, 另一個id=2則是 server 啟動後的每 60 秒執行一次, 可看到執行結果與預期相符!
作者另外也提供了 Web UI 服務, 在 github 上的 docker-compose file 上我有加, 打開你的 localhost:8080 後會可以看到任務以及你的 queue 的執行狀況啦。
Conclusion:
這次介紹的 golang 異步任務套件 Asynq, 使用起來簡單易上手, 也提供多種功能, 大家有興趣的話可以到他的官網上看看, 詳細的列了許多功能及介紹,
最後感謝您的觀看, 有錯誤的地方或是有疑問的地方歡迎留言給我。