深入浅出golang errgroup.Group

原生支持并发是Go语言最强大的特性,比如channels和goroutines。但是对于Go语言初学者尝试并接受并发概念并不是很轻松。 Go团队

原生支持并发是Go语言最强大的特性,比如channels和goroutines。但是对于Go语言初学者尝试并接受并发概念并不是很轻松。

Go团队发布的第一个 goroutines 的管理工具是 sync.WaitGroup,WaitGroup会阻塞直到指定数量的goroutine已经完成执行,这是文档中的一个例子:

var wgsync.WaitGroup

var urls = []string{

        "http://www.golang.org/",

        "http://www.google.com/",

        "http://www.somestupidname.com/",

}

for _, url := range urls {

        // Increment the WaitGroup counter.

        wg.Add(1)

        // Launch a goroutine to fetch the URL.

        gofunc(urlstring) {

                // Decrement the counter when the goroutine completes.

                deferwg.Done()

                // Fetch the URL.

                http.Get(url)

        }(url)

}

// Wait for all HTTP fetches to complete.

wg.Wait()

 

WaitGroup 使你在处理并发任务时对goroutines的创建和停止的数量控制变得简单。每次你创建一个goroutine的时候只要调用Add()方法就可以了。当这个任务结束调用wg.Done()。等待所有的任务完成,调用 wg.Wait()。但是用 WatiGroup唯一的问题就是当你的goroutines出错时,你并不能知道出错的原因。

Extending sync.WaitGroup’s functionality

最近,Go团队在实验仓库中添加了一个名为sync.ErrGroup的新软件包。 sync.ErrGroup再sync.WaitGroup功能的基础上,增加了错误传递,以及在发生不可恢复的错误时取消整个goroutine集合,或者等待超时。我们同样来看一个示例:

var g errgroup.Group

var urls = []string{

"http://www.golang.org/",

"http://www.google.com/",

"http://www.somestupidname.com/",

}

for _, url := range urls {

// Launch a goroutine to fetch the URL.

url := url // https://golang.org/doc/faq#closures_and_goroutines

g.Go(func() error {

    // Fetch the URL.

    resp, err := http.Get(url)

    if err == nil {

        resp.Body.Close()

    }

    return err

})

}

// Wait for all HTTP fetches to complete.

if err := g.Wait(); err == nil {

fmt.Println("Successfully fetched all URLs.")

}

 

g.Go()方法不仅允许你传一个匿名的函数,而且还能捕获错误信息,你只要像这样返回一个错误 return err,这使开发者使用goroutines时开发效率显著提高。

为了测试sync.ErrGroup的所有功能,我写了一个小程序,用一个指定的模式递归搜索目录中的Go文件。这有助于在Go源代码树中查找已使用已弃用或更新的包的实例。要测试sync.ErrGroup的所有功能,我还为应用程序添加了超时设置在功能。 如果达到时间限制,所有搜索和处理goroutine将被取消,程序将结束

当程序运行时,它会生成以下结果:

$ gogrep -timeout 1000ms . fmt                                                                                                

gogrep.go

1 hits

 

如果你使用的参数不正确的话,你会看到下面的输出

gogrepbyBrianKetelsen

Flags:

-timeoutduration

    timeoutin milliseconds (default 500ms)

Usage:

gogrep [flags] pathpattern

 

How sync.ErrGroup makes application building easier

让我们看看我们是如何利用 sync.ErrGroup 来使程序写的更简单。我们将从一个 main() 函数开始。

package main

import (

    "bytes"

    "flag"

    "fmt"

    "io/ioutil"

    "log"

    "os"

    "path/filepath"

    "strings"

    "time"

 

    "golang.org/x/net/context"

    "golang.org/x/sync/errgroup"

)

funcmain() {

    duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")

    flag.Usage = func() {

        fmt.Printf("%s by Brian Ketelsen/n", os.Args[0])

        fmt.Println("Usage:")

        fmt.Printf("    gogrep [flags] path pattern /n")

        fmt.Println("Flags:")

        flag.PrintDefaults()

    }

    flag.Parse()

    if flag.NArg() != 2 {

        flag.Usage()

        os.Exit(-1)

    }

    path := flag.Arg(0)

    pattern := flag.Arg(1)

    ctx, _ := context.WithTimeout(context.Background(), *duration)

    m, err := search(ctx, path, pattern)

    if err != nil {

        log.Fatal(err)

    }

    for _, name := range m {

        fmt.Println(name)

    }

    fmt.Println(len(m), "hits")

}

funcsearch(ctxcontext.Context, rootstring, patternstring) ([]string, error) {

    g, ctx := errgroup.WithContext(ctx)

    paths := make(chanstring, 100)

    // get all the paths

 

    g.Go(func() error {

        deferclose(paths)

 

        return filepath.Walk(root, func(pathstring, infoos.FileInfo, errerror) error {

            if err != nil {

                return err

            }

            if !info.Mode().IsRegular() {

                return nil

            }

            if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") {

                return nil

            }

 

            select {

            case paths <- path:

            case <-ctx.Done():

                return ctx.Err()

            }

            return nil

        })

 

    })

 

    c := make(chanstring, 100)

    for path := range paths {

        p := path

        g.Go(func() error {

            data, err := ioutil.ReadFile(p)

            if err != nil {

                return err

            }

            if !bytes.Contains(data, []byte(pattern)) {

                return nil

            }

            select {

            case c <- p:

            case <-ctx.Done():

                return ctx.Err()

            }

            return nil

        })

    }

    gofunc() {

        g.Wait()

        close(c)

    }()

 

    var m []string

    for r := range c {

        m = append(m, r)

    }

    return m, g.Wait()

}

 

在这行代码以前,是对命令行传进来的参数做了解析。这行以后才是我们真正感兴趣的代码。

ctx, _ := context.WithTimeout(context.Background(), *duration)

 

我创建了一个context.Context并且为它添加了超时设置,当超时时间到了,”ctx”将接收到channel的超时警告。WithTimeout同样也会返回一个取消的方法,但是我们不需要,所以用 “_” 来忽略掉了。

下面的search() 方法的有context,search path,和 search pattern。最后把找到的文件和数量输出。

参考文献: https://www.oreilly.com/learning/run-strikingly-fast-parallel-file-searches-in-go-with-sync-errgroup

https://godoc.org/golang.org/x/sync/errgroup#example-Group–Parallel

深入浅出golang errgroup.Group

未登录用户
全部评论0
到底啦