Go语言并发目录遍历
在本节中,我们将构建一个程序,根据命令行指定的输入,报告一个或多个目录的磁盘使用情况,类似于 UNIX 的
程序的完整代码如下所示,代码中 main 函数使用两个 goroutine,后台 goroutine 调用 walkDir 遍历命令行上指定的每一个目录,最后关闭 fileSizes 通道;主 goroutine 计算从通道中接收的文件的大小的和,最后输出总数。
运行结果如下所示:
所以,下面为每一个 walkDir 的调用创建一个新的 goroutine。它使用 sync.WaitGroup 来为当前存活的 walkDir 调用计数,一个 goroutine 在计数器减为 0 的时候关闭 fileSizes 通道。
du
命令。该程序大多数工作是由下面的 walkDir 函数完成,它使用 dirents 辅助函数来枚举目录中的条目,如下所示:// wakjDir 递归地遍历以 dir 为根目录的整个文件树,并在 filesizes 上发送每个已找到文件的大小 func walkDir(dir string, fileSizes chan<- int64) { for _, entry := range dirents(dir) { if entry.IsDir() { subdir := filepath.Join(dir, entry.Name()) walkDir(subdir, fileSizes) } else { fileSizes <- entry.Size() } } } // dirents 返回 dir 目录中的条目 func dirents(dir string) []os.FileInfo { entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du1: %v\n", err) return nil } return entries }ioutil.ReadDir 函数返回一个 os.FileInfo 类型的 slice,针对单个文件同样的信息可以通过调用 os.Stat 函数来返回。对每一个子目录,walkDir 递归调用它自己,对于每一个文件,walkDir 发送一条消息到 fileSizes 通道,消息的内容为文件所占用的字节数。
程序的完整代码如下所示,代码中 main 函数使用两个 goroutine,后台 goroutine 调用 walkDir 遍历命令行上指定的每一个目录,最后关闭 fileSizes 通道;主 goroutine 计算从通道中接收的文件的大小的和,最后输出总数。
package main import ( "flag" "fmt" "io/ioutil" "os" "path/filepath" ) func main() { // 确定初始目录 flag.Parse() roots := flag.Args() if len(roots) == 0 { roots = []string{"."} } // 遍历文件树 fileSizes := make(chan int64) go func() { for _, root := range roots { walkDir(root, fileSizes) } close(fileSizes) }() // 输出结果 var nfiles, nbytes int64 for size := range fileSizes { nfiles++ nbytes += size } printDiskUsage(nfiles, nbytes) } func printDiskUsage(nfiles, nbytes int64) { fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9) } // wakjDir 递归地遍历以 dir 为根目录的整个文件树,并在 filesizes 上发送每个已找到的文件的大小 func walkDir(dir string, fileSizes chan<- int64) { for _, entry := range dirents(dir) { if entry.IsDir() { subdir := filepath.Join(dir, entry.Name()) walkDir(subdir, fileSizes) } else { fileSizes <- entry.Size() } } } // dirents 返回 dir 目录中的条目 func dirents(dir string) []os.FileInfo { entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du1: %v\n", err) return nil } return entries }在输出结果前,程序等待较长时间:
go run main.go D:/code
18681 files 0.5 GB
-v
标识的时候周期性的输出当前目录的总和,如果只想看到最终的结果省略-v
即可。package main import ( "flag" "fmt" "io/ioutil" "os" "path/filepath" "time" ) var verbose = flag.Bool("v", false, "显示详细进度") func main() { // ...启动后台 goroutine... // 确定初始目录 flag.Parse() roots := flag.Args() if len(roots) == 0 { roots = []string{"."} } // 遍历文件树 fileSizes := make(chan int64) go func() { for _, root := range roots { walkDir(root, fileSizes) } close(fileSizes) }() // 定期打印结果 var tick <-chan time.Time if *verbose { tick = time.Tick(500 * time.Millisecond) } var nfiles, nbytes int64 loop: for { select { case size, ok := <-fileSizes: if !ok { break loop // fileSizes 关闭 } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // 最终总数 } func printDiskUsage(nfiles, nbytes int64) { fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9) } // wakjDir 递归地遍历以 dir 为根目录的整个文件树,并在 filesizes 上发送每个已找到的文件的大小 func walkDir(dir string, fileSizes chan<- int64) { for _, entry := range dirents(dir) { if entry.IsDir() { subdir := filepath.Join(dir, entry.Name()) walkDir(subdir, fileSizes) } else { fileSizes <- entry.Size() } } } // dirents 返回 dir 目录中的条目 func dirents(dir string) []os.FileInfo { entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du1: %v\n", err) return nil } return entries }因为这个程序没有使用 range 循环,所以第一个 select 情况必须显式判断 fileSizes 通道是否已经关闭,使用两个返回值的形式进行接收操作。如果通道已经关闭,程序退出循环。标签化的 break 语句将跳出 select 和 for 循环的逻辑。没有标签的 break 只能跳出 select 的逻辑,导致循环的下一次迭代。
运行结果如下所示:
go run main.go -v D:\
296077 files 57.9 GB
302142 files 58.0 GB
306669 files 58.1 GB
314725 files 58.2 GB
320050 files 58.3 GB
341713 files 58.6 GB
346102 files 64.2 GB
所以,下面为每一个 walkDir 的调用创建一个新的 goroutine。它使用 sync.WaitGroup 来为当前存活的 walkDir 调用计数,一个 goroutine 在计数器减为 0 的时候关闭 fileSizes 通道。
package main import ( "flag" "fmt" "io/ioutil" "os" "path/filepath" "sync" "time" ) var verbose = flag.Bool("v", false, "显示详细进度") func main() { // ...确定根目录... flag.Parse() // 确定初始目录 roots := flag.Args() if len(roots) == 0 { roots = []string{"."} } // 并行遍历每一个文件树 fileSizes := make(chan int64) var n sync.WaitGroup for _, root := range roots { n.Add(1) go walkDir(root, &n, fileSizes) } go func() { n.Wait() close(fileSizes) }() // 定期打印结果 var tick <-chan time.Time if *verbose { tick = time.Tick(500 * time.Millisecond) } var nfiles, nbytes int64 loop: for { select { case size, ok := <-fileSizes: if !ok { break loop // fileSizes 关闭 } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // 最终总数 } func printDiskUsage(nfiles, nbytes int64) { fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9) } func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) { defer n.Done() for _, entry := range dirents(dir) { if entry.IsDir() { n.Add(1) subdir := filepath.Join(dir, entry.Name()) go walkDir(subdir, n, fileSizes) } else { fileSizes <- entry.Size() } } } // sema是一个用于限制目录并发数的计数信号量 var sema = make(chan struct{}, 20) // dirents返回directory目录中的条目 func dirents(dir string) []os.FileInfo { sema <- struct{}{} // 获取令牌 defer func() { <-sema }() // 释放令牌 entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du: %v\n", err) return nil } return entries }尽管系统与系统之间有很多的不同,但是这个版本的速度比前一个版本快几倍。
所有教程
- C语言入门
- C语言编译器
- C语言项目案例
- 数据结构
- C++
- STL
- C++11
- socket
- GCC
- GDB
- Makefile
- OpenCV
- Qt教程
- Unity 3D
- UE4
- 游戏引擎
- Python
- Python并发编程
- TensorFlow
- Django
- NumPy
- Linux
- Shell
- Java教程
- 设计模式
- Java Swing
- Servlet
- JSP教程
- Struts2
- Maven
- Spring
- Spring MVC
- Spring Boot
- Spring Cloud
- Hibernate
- Mybatis
- MySQL教程
- MySQL函数
- NoSQL
- Redis
- MongoDB
- HBase
- Go语言
- C#
- MATLAB
- JavaScript
- Bootstrap
- HTML
- CSS教程
- PHP
- 汇编语言
- TCP/IP
- vi命令
- Android教程
- 区块链
- Docker
- 大数据
- 云计算