91在线一级黄片|91视频在线观看18|成人夜间呦呦网站|91资源欧美日韩超碰|久久最新免费精品视频一区二区三区|国产探花视频在线观看|黄片真人免费三级片毛片|国产人无码视频在线|精品成人影视无码三区|久久视频爱久久免费精品

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
使用Golang構(gòu)建一萬(wàn)+每秒處理請(qǐng)求的高性能系統(tǒng)

背景

一談到golang,大家的第一感覺(jué)就是高并發(fā),高性能。但是語(yǔ)言本身的優(yōu)勢(shì)是不是,就讓程序員覺(jué)得編寫(xiě)高性能的處理系統(tǒng)變得輕而易舉,水到渠成呢。下面這篇文章給大家的提醒便是,我們只有在充分理解語(yǔ)言本身的特性,并巧妙加以利用的前提下,才能寫(xiě)出高性能、高并發(fā)的處理程序,才能為企業(yè)節(jié)省成本,為客戶提供好的服務(wù)。

銅鼓網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)公司從2013年創(chuàng)立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專(zhuān)注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。

每分鐘處理百萬(wàn)請(qǐng)求

?Malwarebytes的首席架構(gòu)師Marcio Castilho分享了他在公司高速發(fā)展過(guò)程中,開(kāi)發(fā)高性能數(shù)據(jù)處理系統(tǒng)的經(jīng)歷。整個(gè)過(guò)程向我們?cè)敿?xì)展示了如何不斷的優(yōu)化與提升系統(tǒng)性能的過(guò)程,值得我們思考與學(xué)習(xí)。大佬也不是一下子就給出最優(yōu)方案的。

首先作者的目標(biāo)是能夠處理來(lái)自數(shù)百萬(wàn)個(gè)端點(diǎn)的大量POST請(qǐng)求,然后將接收到的JSON 請(qǐng)求體,寫(xiě)入Amazon S3,以便map-reduce稍后對(duì)這些數(shù)據(jù)進(jìn)行操作。這個(gè)場(chǎng)景和我們現(xiàn)在的很多互聯(lián)網(wǎng)系統(tǒng)的場(chǎng)景是一樣的。傳統(tǒng)的處理方式是,使用隊(duì)列等中間件,做緩沖,消峰,然后后端一堆worker來(lái)異步處理。因?yàn)樽髡咭沧隽藘赡闓O開(kāi)發(fā)了,經(jīng)過(guò)討論他們決定使用GO來(lái)完成這項(xiàng)工作。

第一版代碼

下面是Marcio給出的本能第一反應(yīng)的解決方案,和大家的思路是不是一致的。首先他給出了負(fù)載(Payload)還有負(fù)載集合(PayloadCollection)的定義,然后他寫(xiě)了一個(gè)處理web請(qǐng)求的Handler(payloadHandler)。在payloadHandler里面,由于把負(fù)載上傳S3比較耗時(shí),所以針對(duì)每個(gè)負(fù)載,啟動(dòng)GO的協(xié)程來(lái)異步上傳。具體的實(shí)現(xiàn),大家可以看下面48-50行貼出的代碼。

type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}

那結(jié)果怎么樣呢?Marcio和他的同事們低估了請(qǐng)求的量級(jí),而且上面的實(shí)現(xiàn)方法,又無(wú)法控制GO協(xié)程的生成數(shù)量,這個(gè)版本部署到生產(chǎn)后,很快就崩潰了。Marcio畢竟是牛逼架構(gòu)師,他很快根據(jù)問(wèn)題給出了新的解決方案。

第二版代碼

第一個(gè)版本的假設(shè)是,請(qǐng)求的生命周期都是很短的,不會(huì)有長(zhǎng)時(shí)間的阻塞操作耗費(fèi)資源。在這個(gè)前提下,我們可以根據(jù)請(qǐng)求不停的生成GO協(xié)程來(lái)處理請(qǐng)求。但是事實(shí)并非如此,Marcio轉(zhuǎn)變思路,引入隊(duì)列的思想。創(chuàng)建了Buffered Channel,把請(qǐng)求緩沖起來(lái),然后再通過(guò)一個(gè)同步處理器從Channel里面把請(qǐng)求取出,上傳S3.這是典型的生產(chǎn)者-消費(fèi)者模型。

處理流程

這個(gè)版本的問(wèn)題是,首先同步處理器的處理能力有限,他的處理能力比不上請(qǐng)求到達(dá)的速度。很快Buffered Channel就會(huì)滿了,然后后續(xù)的客戶請(qǐng)求都會(huì)被阻塞。在Marcio他們部署這個(gè)有缺陷的版本幾分鐘后,延遲率會(huì)以固定的速率增加。

系統(tǒng)部署后的延遲

第三版代碼

Marcio引入了2層Channel,一個(gè)Channel用于緩存請(qǐng)求,是一個(gè)全局Channel,本文中就是下面的JobQueue,一個(gè)Channel用于控制每個(gè)請(qǐng)求隊(duì)列并發(fā)多少個(gè)worker.從下面的代碼可以看到,每個(gè)Worker都有兩個(gè)關(guān)鍵屬性,一個(gè)是WorkerPool(這個(gè)也是一個(gè)全局的變量,即所有的worker的這個(gè)屬性都指向同一個(gè),worker在創(chuàng)建后,會(huì)把自身的JobChannel寫(xiě)入WorkerPool完成注冊(cè)),一個(gè)是JobChannel(用于緩存分配需要本worker處理的請(qǐng)求作業(yè))。web處理請(qǐng)求payloadHandler,會(huì)把接收到的請(qǐng)求放到JobQueue后,就結(jié)束并返回。

var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}

請(qǐng)求任務(wù)都放到JobQueue里面了,如何監(jiān)聽(tīng)隊(duì)列,并觸發(fā)請(qǐng)求呢。這個(gè)地方又出現(xiàn)了Dispatcher,我們?cè)诹硪黄恼轮杏性敿?xì)探討(基于dispatcher模式的事件與數(shù)據(jù)分發(fā)處理器的go語(yǔ)言實(shí)現(xiàn):
https://www.toutiao.com/article/7186518439215841827/)。在系統(tǒng)啟動(dòng)的時(shí)候,我們會(huì)通過(guò)NewDispatcher生成Dispatcher,并調(diào)用它的Run方法。

type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}

Dispatcher與Worker的關(guān)系如下圖所示:

第三方案整體流程

1.客戶請(qǐng)求到Handler。

2.Handler把請(qǐng)求作業(yè)寫(xiě)入JobQueue。

3.Dispatcher的dispatcher方法,從全局JobQueue中讀取Job。

4.Dispatcher的dispatcher方法同時(shí)也從WorkerPool中讀取JobChannel(屬于某一個(gè)Worker,即每一個(gè)Worker都有一個(gè)JobChannel)。

5.Dispatcher把獲得的Job寫(xiě)入JobChannel,即分配某個(gè)Worker。

6.Worker從自己的JobChannel中獲取作業(yè)并執(zhí)行。執(zhí)行完成后,空閑后,把自己的JobChannel再次寫(xiě)入WorkerPool等待分配。

這樣實(shí)現(xiàn)后,效果明顯,同時(shí)需要的機(jī)器數(shù)量大幅降低了,從100臺(tái)降低到20臺(tái)。

第三方案效果

部署機(jī)器變化

這里的兩層,一層是全局JobQueue,緩存任務(wù)。第二個(gè)是每個(gè)Worker都有自己的執(zhí)行隊(duì)列,一臺(tái)機(jī)器可以創(chuàng)建多個(gè)Worker。這樣就提升了處理能力。

方案對(duì)比

方案思想

實(shí)現(xiàn)難度

方案問(wèn)題

GO協(xié)程原生方法

簡(jiǎn)單

無(wú)法應(yīng)對(duì)大規(guī)模請(qǐng)求,無(wú)法控制協(xié)程數(shù)量

GO 單層Channel

簡(jiǎn)單

當(dāng)處理能力達(dá)不到請(qǐng)求速率后,隊(duì)列滿,系統(tǒng)崩潰

GO兩層Channel

復(fù)雜

參考資料:

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/。

https://github.com/ReGYChang/zero/blob/main/pkg/utils/worker_pool.go。


文章標(biāo)題:使用Golang構(gòu)建一萬(wàn)+每秒處理請(qǐng)求的高性能系統(tǒng)
當(dāng)前地址:http://m.jiaoqi3.com/article/dpjcjho.html