Tóm Tắt
1.6. Mô hình thực thi đồng thời
Một điểm mạnh của Golang là tích hợp sẵn cơ chế xử lý đồng thời (concurrency). Lý thuyết về hệ thống tương tranh của Go là CSP (Communicating Sequential Process) được đề xuất bởi Hoare vào năm 1978. CSP được áp dụng lần đầu cho máy tính đa dụng T9000 mà Hoare có tham gia. Từ NewSqueak, Alef, Limbo đến Golang hiện tại, Rob Pike, người có hơn 20 năm kinh nghiệm thực tế với CSP, rất quan tâm đến tiềm năng áp dụng CSP vào ngôn ngữ lập trình đa dụng. Khái niệm cốt lõi của lý thuyết CSP cũng được áp dụng vào lập trình concurrency trong Go.
Go concurrency
Ở phần này chúng ta cùng xem qua các cách dùng goroutine cũng như các cách xử lý goroutine mà chúng ta thường gặp trong lúc lập trình.
1.6.1. Phiên bản concurrency với Hello World
Trong hầu hết các ngôn ngữ hiện đại, vấn đề chia sẻ tài nguyên được giải quyết bằng cơ chế đồng bộ hóa như khóa (lock) nhưng Golang có cách tiếp cận riêng là chia sẻ giá trị thông qua channel.
Goroutine trao đổi giá trị qua channel
Trên thực tế khi nhiều thread thực thi độc lập chúng hiếm khi chủ động chia sẻ tài nguyên. Tại bất kỳ thời điểm nào, tốt nhất là chỉ Goroutine sở hữu tài nguyên của chính mình. Golang có một triết lý được thể hiện bằng slogan:
Do not communicate by sharing memory; instead, share memory by communicating.
Do not communicate through shared memory, but share memory through communication.
Mặc dù các vấn đề tương tranh đơn giản như tham chiếu đến biến đếm có thể được hiện thực bằng atomic operations
hoặc mutex lock
, nhưng việc kiểm soát truy cập thông qua Channel giúp cho code của chúng ta clean và “Golang” hơn.
Áp dụng Mutex
Xem xét đoạn code sau:
func
main() {
var
mu sync.Mutex
go
func
(){
fmt.Println("Hello World"
)
mu.Lock()
}()
mu.Unlock()
}
Ở đây, mu.Lock()
và mu.Unlock()
không ở trong cùng một Goroutine, vì vậy nó không đáp ứng được mô hình bộ nhớ nhất quán tuần tự (sequential consistency memory model).
Sửa lại đoạn code trên như sau:
func
main() {
var
mu sync.Mutex
mu.Lock()
go
func
(){
fmt.Println("Hello World"
)
mu.Unlock()
}()
mu.Lock()
}
Áp dụng Channel
Đồng bộ hóa với mutex là một cách tiếp cận ở mức độ tương đối đơn giản. Bây giờ ta sẽ sử dụng một unbuffered channel để hiện thực đồng bộ hóa:
func
main() {
done := make
(chan
int
)
go
func
(){
fmt.Println("Hello World"
)
<-done
}()
done <- 1
}
Cách này gặp bất cập với buffered channel vì lúc đó không có gì đảm bảo rằng goroutine sẽ in ra trước khi thoát main
. Cách tiếp cận tốt hơn là hoán đổi hướng gửi và nhận của channel để tránh các sự kiện đồng bộ hóa bị ảnh hưởng bởi kích thước buffer của nó:
func
main() {
done := make
(chan
int
, 1
)
go
func
(){
fmt.Println("Hello World"
)
done <- 1
}()
<-done
}
Dựa trên buffered channel, chúng ta có thể dễ dàng mở rộng thread print đến N. Ví dụ sau là mở 10 goroutine để in riêng biệt:
func
main() {
done := make
(chan
int
, 10
)
for
i := 0
; i < cap
(done); i++ {
go
func
(){
fmt.Println("Hello World"
)
done <- 1
}()
}
for
i := 0
; i < cap
(done); i++ {
<-done
}
}
Sử dụng sync.WaitGroup thay cho Channel
Một cách đơn giản hơn là sử dụng sync.WaitGroup
để chờ một tập các sự kiện:
func
main() {
var
wg sync.WaitGroup
for
i := 0
; i < 10
; i++ {
wg.Add(1
)
go
func
() {
fmt.Println("Hello World"
)
wg.Done()
}()
}
wg.Wait()
}
1.6.2. Tác vụ Atomic
Tác vụ atomic trên một vùng nhớ chia sẻ thì đảm bảo rằng vùng nhớ đó chỉ có thể được truy cập bởi một Goroutine tại một thời điểm. Để đạt được điều này ta có thể dùng sync.Mutex.
Sử dụng sync.Mutex
import
(
"sync"
)
var
total struct
{
sync.Mutex
value int
}
func
worker(wg *sync.WaitGroup) {
defer
wg.Done()
for
i := 0
; i <= 100
; i++ {
total.Lock()
total.value += i
total.Unlock()
}
}
func
main() {
var
wg sync.WaitGroup
wg.Add(2
)
go
worker(&wg)
go
worker(&wg)
wg.Wait()
fmt.Println(total.value)
}
Trong một chương trình đồng thời, ta cần có cơ chế để lock
và unlock
trước và sau khi truy cập vào vùng critical section. Nếu không có sự bảo vệ biến total
, kết quả cuối cùng có thể bị sai khác do sự truy cập đồng thời của nhiều thread.
Sử dụng sync/atomic
Thay vì dùng mutex, chúng ta cũng có thể dùng package sync/atomic, đây là giải pháp hiệu quả hơn đối với một biến số học.
import
(
"sync"
"sync/atomic"
)
var
total uint64
func
worker(wg *sync.WaitGroup) {
defer
wg.Done()
var
i uint64
for
i = 0
; i <= 100
; i++ {
atomic.AddUint64(&total, i)
}
}
func
main() {
var
wg sync.WaitGroup
wg.Add(2
)
go
worker(&wg)
go
worker(&wg)
wg.Wait()
fmt.Println(total)
}
Để ghi và đọc atomic trên những đối tượng phức tạp hơn thì ta dùng kiểu atomic.Value, ví dụ:
package
main
import
(
"sync/atomic"
"time"
)
func
loadConfig() map
[string
]string
{
return
make
(map
[string
]string
)
}
func
requests() chan
int
{
return
make
(chan
int
)
}
func
main() {
var
config atomic.Value
config.Store(loadConfig())
go
func
() {
for
{
time.Sleep(10
* time.Second)
config.Store(loadConfig())
}
}()
for
i := 0
; i < 10
; i++ {
go
func
() {
for
r := range
requests() {
c := config.Load()
_, _ = r, c
}
}()
}
}
1.6.3. Mô hình Producer Consumer
Mô hình Producer – Consumer
Ví dụ phổ biến nhất về lập trình concurrency là mô hình Producer Consumer, giúp tăng tốc độ xử lý chung của chương trình bằng cách cân bằng sức mạnh của các thread “sản xuất” (produce) và “tiêu thụ” (consume).
Producer tạo ra một số dữ liệu và sau đó đưa nó vào hàng đợi, cùng lúc đó consumer cũng lấy dữ liệu từ hàng đợi này ra để xử lý. Điều này làm cho produce và consume trở thành hai quá trình bất đồng bộ. Khi không có dữ liệu trong hàng đợi kết quả, consumer sẽ chờ đợi ở trạng thái “đói”, còn khi dữ liệu trong hàng đợi bị đầy, producer phải đối mặt với vấn đề mất mát dữ liệu khi CPU phải loại bỏ bớt dữ liệu trong đó để nạp thêm.
Golang hiện thực cơ chế này khá đơn giản:
func
Producer(factor int
, out chan
<- int
) {
for
i := 0
; ; i++ {
out <- i*factor
}
}
func
Consumer(in <-chan
int
) {
for
v := range
in {
fmt.Println(v)
}
}
func
main() {
ch := make
(chan
int
, 64
)
go
Producer(3
, ch)
go
Producer(5
, ch)
go
Consumer(ch)
time.Sleep(5
* time.Second)
}
Chúng ta có thể để hàm main
giữ trạng thái block mà không thoát và chỉ thoát khỏi chương trình khi người dùng gõ Ctrl-C
:
func
main() {
ch := make
(chan
int
, 64
)
go
Producer(3
, ch)
go
Producer(5
, ch)
go
Consumer(ch)
sig := make
(chan
os.Signal, 1
)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n"
, <-sig)
}
Có 2 producer trong ví dụ trên và không có sự kiện đồng bộ nào giữa hai producer mà chúng concurrency. Do đó, thứ tự của chuỗi output ở consumer là không xác định.
Mô hình publish-and-subscribe thường được viết tắt là mô hình pub/sub. Trong mô hình này, producer trở thành publisher và consumer trở thành subscriber, đồng thời producer:consumer là mối quan hệ M:N.
Mô hình Publish – Subscribe
Trong mô hình producer-consumer truyền thống, thông điệp được gửi đến hàng đợi và mô hình publish-subscription sẽ publish thông điệp đến một topic.
Để hiện thực mô hình này ta implement package pubsub
:
package
pubsub
import
(
"sync"
"time"
)
type
(
subscriber chan
interface
{}
topicFunc func
(v interface
{}) bool
)
type
Publisher struct
{
m sync.RWMutex
buffer int
timeout time.Duration
subscribers map
[subscriber]topicFunc
}
func
NewPublisher(publishTimeout time.Duration, buffer int
) *Publisher {
return
&Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make
(map
[subscriber]topicFunc),
}
}
func
(p *Publisher) Subscribe() chan
interface
{} {
return
p.SubscribeTopic(nil
)
}
func
(p *Publisher) SubscribeTopic(topic topicFunc) chan
interface
{} {
ch := make
(chan
interface
{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return
ch
}
func
(p *Publisher) Evict(sub chan
interface
{}) {
p.m.Lock()
defer
p.m.Unlock()
delete
(p.subscribers, sub)
close
(sub)
}
func
(p *Publisher) Publish(v interface
{}) {
p.m.RLock()
defer
p.m.RUnlock()
var
wg sync.WaitGroup
for
sub, topic := range
p.subscribers {
wg.Add(1
)
go
p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
func
(p *Publisher) Close() {
p.m.Lock()
defer
p.m.Unlock()
for
sub := range
p.subscribers {
delete
(p.subscribers, sub)
close
(sub)
}
}
func
(p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface
{}, wg *sync.WaitGroup,
) {
defer
wg.Done()
if
topic != nil
&& !topic(v) {
return
}
select
{
case
sub <- v:
case
<-time.After(p.timeout):
}
}
Trong ví dụ sau đây, 2 subscriber đăng ký hết tất cả các topic với “golang”:
import
(
"./pubsub"
"time"
"strings"
"fmt"
)
func
main() {
p := pubsub.NewPublisher(100
*time.Millisecond, 10
)
defer
p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func
(v interface
{}) bool
{
if
s, ok := v.(string
); ok {
return
strings.Contains(s, "golang"
)
}
return
false
})
p.Publish("hello, world!"
)
p.Publish("hello, golang!"
)
go
func
() {
for
msg := range
all {
fmt.Println("all:"
, msg)
}
} ()
go
func
() {
for
msg := range
golang {
fmt.Println("golang:"
, msg)
}
} ()
time.Sleep(3
* time.Second)
}
Trong mô hình pub/sub, mỗi thông điệp được gửi tới nhiều subscriber. Publisher thường không biết hoặc không quan tâm subscriber nào nhận được thông điệp. Subscriber và publisher có thể được thêm vào động ở thời điểm thực thi, cho phép các hệ thống phức tạp có thể phát triển theo thời gian. Trong thực tế, những ứng dụng như dự báo thời tiết có thể áp dụng mô hình concurrency này.
1.6.5. Kiểm soát số lượng goroutine
Goroutine là một tính năng mạnh mẽ của Go, mất chi phí rất ít để sử dụng, những tất nhiên nếu dùng với số lượng quá lớn sẽ chiếm gây nhiều lãng phí và cần có một cơ chế để kiểm soát. Một cách thông dụng để đạt được mục đích trên là dùng worker pool.
Mô hình Worker pool
Đầu tiên tạo ra các worker:
func
worker(queue chan
int
, worknumber int
, done chan
bool
) {
for
j := range
queue {
fmt.Println("worker"
, worknumber, "finished job"
, j)
done <- true
}
}
Sau đó có thể áp dụng như sau:
func
main() {
q := make
(chan
int
)
done := make
(chan
bool
)
numberOfWorkers := 4
for
i := 0
; i < numberOfWorkers; i++ {
go
worker(q, i, done)
}
numberOfJobs := 17
for
j := 0
; j < numberOfJobs; j++ {
go
func
(j int
) {
q <- j
}(j)
}
for
c := 0
; c < numberOfJobs; c++ {
<-done
}
}
1.6.6. Dọn dẹp Goroutine
Sau khi job queue rỗng, ta sẽ phải dừng tất cả worker. Goroutine dù khá nhẹ nhưng vẫn không phải miễn phí, nhất là với các hệ thống lớn, dù chỉ là các chi phí nhỏ nhất cũng có thể trở nên khác biệt lớn nếu thay đổi.
Cách đơn giản là dùng kill channel để phát ra tín hiệu ngừng cho goroutine.
func
main() {
killsignal := make
(chan
bool
)
q := make
(chan
int
)
done := make
(chan
bool
)
numberOfWorkers := 4
for
i := 0
; i < numberOfWorkers; i++ {
go
worker(q, i, done, killsignal)
}
numberOfJobs := 17
for
j := 0
; j < numberOfJobs; j++ {
go
func
(j int
) {
q <- j
}(j)
}
for
c := 0
; c < numberOfJobs; c++ {
<-done
}
close
(killsignal)
time.Sleep(2
* time.Second)
}
Trong đó các worker được thiết kế như sau:
func
worker(queue chan
int
, worknumber int
, done, ks chan
bool
) {
for
true
{
select
{
case
k := <-queue:
fmt.Println("doing work!"
, k, "worknumber"
, worknumber)
done <- true
case
<-ks:
fmt.Println("worker halted, number"
, worknumber)
return
}
}
}
1.6.7. Sàng số nguyên tố
Trong phần 1.2, chúng tôi đã trình bày việc triển khai phiên bản concurrency của sàng số nguyên tố để chứng minh tính concurrency của Newsqueak. Nguyên tắc “sàng số nguyên tố” như sau:
Sàng số nguyên tố
Chúng ta cần khởi tạo một chuỗi các số tự nhiên 2, 3, 4, ...
(không bao gồm 0, 1):
func
GenerateNatural() chan
int
{
ch := make
(chan
int
)
go
func
() {
for
i := 2
; ; i++ {
ch <- i
}
}()
return
ch
}
Tiếp theo xây dựng một sàng cho mỗi số nguyên tố: đề xuất một số là bội số của số nguyên tố trong chuỗi đầu vào và trả về một chuỗi mới, đó là một channel mới.
func
PrimeFilter(in <-chan
int
, prime int
) chan
int
{
out := make
(chan
int
)
go
func
() {
for
{
if
i := <-in; i%prime != 0
{
out <- i
}
}
}()
return
out
}
Bây giờ ta có thể sử dụng bộ lọc này trong hàm main
:
func
main() {
ch := GenerateNatural()
for
i := 0
; i < 100
; i++ {
prime := <-ch
fmt.Printf("%v: %v\n"
, i+1
, prime)
ch = PrimeFilter(ch, prime)
}
}
1.6.8. Kẻ thắng làm vua
Có nhiều động lực để lập trình concurrency nhưng tiêu biểu là vì lập trình concurrency có thể đơn giản hóa các vấn đề. Lập trình concurrency cũng có thể cải thiện hiệu năng. Mở hai thread trên CPU đa lõi thường nhanh hơn mở một thread. Trên thực tế về mặt cải thiện hiệu suất, chương trình không chỉ đơn giản là chạy nhanh, mà trong nhiều trường hợp chương trình có thể đáp ứng yêu cầu của người dùng một cách nhanh chóng là điều quan trọng nhất. Khi không có yêu cầu từ người dùng cần xử lý, nên xử lý một số tác vụ nền có độ ưu tiên thấp.
Giả sử chúng ta muốn nhanh chóng tìm kiếm các chủ đề liên quan đến “golang”, có thể mở nhiều công cụ tìm kiếm như Bing, Google hoặc Yahoo. Khi tìm kiếm trả về kết quả trước, ta có thể đóng các trang tìm kiếm khác. Do ảnh hưởng của môi trường mạng và thuật toán của công cụ tìm kiếm mà một số công cụ tìm kiếm có thể trả về kết quả tìm kiếm nhanh hơn. Chúng ta có thể sử dụng một chiến lược tương tự để viết chương trình này:
func
main() {
ch := make
(chan
string
, 32
)
go
func
() {
ch <- searchByBing("golang"
)
}()
go
func
() {
ch <- searchByGoogle("golang"
)
}()
go
func
() {
ch <- searchByBaidu("golang"
)
}()
fmt.Println(<-ch)
}
Áp dụng ý tưởng trên có thể giúp cải thiện hiệu suất bằng cách chọn lấy kẻ chiến thắng trong cuộc đua thời gian.
1.6.9. Context package
Ở thời điểm phát hành Go1.7, thư viện tiêu chuẩn đã thêm một package context để đơn giản hóa hoạt động của dữ liệu, thời gian chờ và thoát giữa nhiều Goroutines. Package context định nghĩa kiểu Context, chứa deadline, cancelation signal và các giá trị request-scope giữa các API và giữa các process.
Chúng ta có thể sử dụng package context để hiện thực lại cơ chế kiểm soát timeout:
func
worker(ctx context.Context, wg *sync.WaitGroup) error {
defer
wg.Done()
for
{
select
{
default
:
fmt.Println("hello"
)
case
<-ctx.Done():
return
ctx.Err()
}
}
}
func
main() {
ctx, cancel := context.WithTimeout(context.Background(), 10
*time.Second)
var
wg sync.WaitGroup
for
i := 0
; i < 10
; i++ {
wg.Add(1
)
go
worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}
Golang tự động lấy lại bộ nhớ, do đó bộ nhớ thường không bị rò rỉ (memory leak). Trong ví dụ trước về sàng số nguyên tố, một Goroutine mới được đưa vào bên trong hàm GenerateNatural
và Goroutine nền PrimeFilter
có nguy cơ bị leak khi hàm main
không còn sử dụng channel. Chúng ta có thể tránh vấn đề này với package context. Dưới đây là phần triển khai sàng số nguyên tố được cải thiện:
func
GenerateNatural(ctx context.Context) chan
int
{
ch := make
(chan
int
)
go
func
() {
for
i := 2
; ; i++ {
select
{
case
<- ctx.Done():
return
case
ch <- i:
}
}
}()
return
ch
}
func
PrimeFilter(ctx context.Context, in <-chan
int
, prime int
) chan
int
{
out := make
(chan
int
)
go
func
() {
for
{
if
i := <-in; i%prime != 0
{
select
{
case
<- ctx.Done():
return
case
out <- i:
}
}
}
}()
return
out
}
func
main() {
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx)
for
i := 0
; i < 100
; i++ {
prime := <-ch
fmt.Printf("%v: %v\n"
, i+1
, prime)
ch = PrimeFilter(ctx, ch, prime)
}
cancel()
}
Khi hàm main
kết thúc hoạt động, nó được thông báo bằng lệnh cancel()
gọi đến Goroutine nền để thoát, do đó tránh khỏi việc leak Goroutine.
Concurrency là một chủ đề rất lớn, và ở đây chúng tôi chỉ đưa ra một vài ví dụ về lập trình concurrency rất cơ bản. Tài liệu chính thức cũng có rất nhiều cuộc thảo luận về lập trình concurrency, có khá nhiều cuốn sách thảo luận cụ thể về lập trình concurrency trong Golang. Độc giả có thể tham khảo các tài liệu liên quan theo nhu cầu của mình.