Asynq: Golang distributed task queue library

郭政旻 (Nick Kuo)
10 min readJul 4, 2021

最近開發一個 go 專案, 有異步任務的需求, 在 python 上大家通常會使用 celery, golang 常見的則是 machinery, 不過今天要跟各位介紹的套件為 Asynq, Github link, backend 使用 redis, 官網上宣稱其為簡單、可依賴、高效的 go 分散式佇列任務。

作者 Ken Hibino, 任職於 Google, 文檔寫得很完整, 實際用下來真的是挺棒的, 推薦給大家。

官網上列出的幾項特點, 挑一些我覺得重要的點放上來:

  • 任務排程
  • 任務寫進 redis 後會持久化
  • 任務執行失敗會自動 retry
  • 任務權重優先制
  • 可使用 unique-option 來避免任務重複執行
  • 任務可以設定執行時間或是最久可執行的時間
  • 定時發送任務
  • 支援 redis cluster 及 redis sentinal 來達到高可用性
  • 作者提供 Wen UI & CLI tool 讓大家查看任務執行狀況

個人覺得做異步任務很重要的幾點, Asynq 都有達到需求:

  1. 減輕 API server 的負擔, 透過把不及時的任務指派給相對應的 worker 在背景處理, 並且可以監控 worker 是否有異常情形。
  2. 當業務量變大時, 可以輕鬆的增加 worker 數量來做水平擴展。
  3. 定時發送任務。

以下就開始介紹如何使用吧!

範例 code 都放在這個 github 上, 想在本地上跑的可以直接拿來跑

首先要在你的 project(版本需 go 1.13以上) 上安裝 asynq library

go get -u github.com/hibiken/asynq

確保你的 redis 已經正確運行, 即可以開始封裝 task creation & task handling

package tasksimport (
"context"
"fmt"
"time"
"github.com/hibiken/asynq"
)
const (
// TypeHeartBeat is a name of the task type
TypeHeartBeat = "heartbeat"
)
//----------------------------------------------
// 建立一個 task
// task 須包含 type & payload, 並被封裝在 asynq.Task struct 中
//----------------------------------------------
// HeartBeatTask payload.
func HeartBeatTask(id int) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
}
// Return a new task with given type and payload.
return asynq.NewTask(TypeHeartBeat, payload)
}

//---------------------------------------------------------------
// 建立 handle task 來處理進來的 input
// 注意須符合 asynq.HandlerFunc 的 interface
// 當 task 被觸發時, 由 handler 來處理我們的商業邏輯
//---------------------------------------------------------------
// HandlHeartBeatTask handler.
func HandlHeartBeatTask(c context.Context, t *asynq.Task) error {
// Get user ID from given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
time := time.Now()
fmt.Printf("heart beat user_id %d, now is %v\n", id, time)

// Do something you want
...
return nil
}

程式裡可使用 asynq.RedisClientOpt 來連接 redis

redisclient = asynq.RedisClientOpt{
Addr: "localhost:6379",
DB: 1,
}

Comsumer Worker

WorkerConsumer function 這裡會使用建立一個 worker 來處理由 client 或是自己server 發出的定時任務, 以這個例子來看, 這個 worker 會接收 critical, default & low 三個 queue的任務, 根據 task type 來執行相對應的 handler function

當你的服務需求變複雜, 流量增大時, 可以輕鬆的啟動多台 workers 來增加 throughput, 也可以自定義 queue 來讓特定的任務送到指定的 queue 中, Asynq 給予很大的彈性讓我們來客製化

// main.gopackage main// WorkerConsumer function
func WorkerConsumer() {
worker := asynq.NewServer(redisclient, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
})

mux := asynq.NewServeMux()
// Define a task handler for the welcome email task.
mux.HandleFunc(
tasks.TypeHeartBeat, // task type
tasks.HandlHeartBeatTask, // handler function
)
// Run worker server.
if err := worker.Run(mux); err != nil {
log.Fatal(err)
}
}

Client

這邊為模擬用戶發送異步任務, HeartBeatTask function 為返回一個asynq 的task, 並帶上 type nema 及 payload, 把 task1, task2 分別丟進 critical 跟 low 的 queue 中, 在前面定義好的 worker 就會經由 redis 接到相對應得任務而開始執行了! (tasks2 多了一個參數, 在此為延後10秒後執行)

func ClientProducer() {
client := asynq.NewClient(redisclient)
for id := 1; id < 5; id++ {
task1 := tasks.HeartBeatTask(id)
task2 := tasks.HeartBeatTask(id * 100)
// Process the task immediately in critical queue.
if _, err := client.Enqueue(
task1, // task payload
asynq.Queue("critical"), // set queue for task
); err != nil {
log.Fatal(err)
}
delay := 10 * time.Second
if _, err := client.Enqueue(
task2, // task payload
asynq.Queue("low"), // set queue for task
asynq.ProcessIn(delay), // set time to process task
); err != nil {
log.Fatal(err)
}
}
}

Period Task

不少人都會有需要發定時任務的需求, 在每天固定時間執行某些功能或是美多久時間做一個任務等等的, 在 asynq 中, 定義好 task 後, 使用 schedile.Register 即可已做到定時發任務的效果了, 簡單又方便, 不過定時任務最小單位為分鐘, 如果想要精準到秒的話只能夠只用 every 這關鍵字了, 可參考下方範例:

// BeatProducer function
func BeatProducer() {
scheduler := asynq.NewScheduler(
redisclient,
&asynq.SchedulerOpts{},
)
task1 := tasks.HeartBeatTask(1)
task2 := tasks.HeartBeatTask(2)
// You can use cron spec string to specify the schedule.
// The cron parameter is (minute, hour, day, month, day of week)
entryID, err := scheduler.Register("* * * * *", task1, asynq.Queue("critical"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an every minute entry: %q\n", entryID)
// You can use "@every <duration>" to specify the interval.
entryID, err = scheduler.Register("@every 60s", task2, asynq.Queue("default"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an every 60s entry: %q\n", entryID)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}

重要的組件都介紹到了, 實際跑起來看看效果如何吧, 要起3個服務(worker, client & period task server)

分別在不同終端機上執行:

TYPE=worker go run main.go
TYPE=client go run main.go
TYPE=beat go run main.go

結果:

client 執行時, 會直接送出 id 為 1~4 的 4 個任務, 並送出 id 100, 200, 300, 400 任務且延後10秒在執行, 由圖一可看到前4筆 log 時間與 client 啟動時一致, 10秒後印出了另外的4個 tasks。

beat 則為定時任務, 定義了兩個, 其中 id=1 為每分鐘執行, 另一個id=2則是 server 啟動後的每 60 秒執行一次, 可看到執行結果與預期相符!

作者另外也提供了 Web UI 服務, 在 github 上的 docker-compose file 上我有加, 打開你的 localhost:8080 後會可以看到任務以及你的 queue 的執行狀況啦。

Conclusion:

這次介紹的 golang 異步任務套件 Asynq, 使用起來簡單易上手, 也提供多種功能, 大家有興趣的話可以到他的官網上看看, 詳細的列了許多功能及介紹,

最後感謝您的觀看, 有錯誤的地方或是有疑問的地方歡迎留言給我。

--

--