一.同时向管道读写数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import "fmt"

func main() {
writeChan := make(chan int, 20) // 写入通道,缓冲大小为20
exitChan := make(chan bool) // 退出通道

go readData(writeChan, exitChan) // 启动readData协程
go writeData(writeChan) // 启动writeData协程

for {
v, ok := <-exitChan // 接收退出通道值
if ok {
fmt.Println("完成:", v) // 打印完成信息
break // 退出循环
}
}
}

func writeData(writeChan chan int) {
for i := 1; i <= 20; i++ {
writeChan <- i // 将i写入通道
fmt.Println("写入数据~~:", i) // 打印写入数据信息
}
close(writeChan) // 关闭通道
}

func readData(writeChan chan int, exitChan chan bool) {
for v := range writeChan {
fmt.Println("读取数据~~:", v) // 打印读取数据信息
}
// 结束
exitChan <- true // 向退出通道发送完成信号
close(exitChan) // 关闭通道
}

二.协程案例-计算2000个数各个数的累加和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"fmt"
"sync"
)

var (
wg3 sync.WaitGroup // wg3用于等待PutData函数执行完成
wg4 sync.WaitGroup // wg4用于等待SumUp函数执行完成
)

func main() {
numChan := make(chan int, 2000) // 创建一个缓冲大小为2000的整型通道
wg3.Add(2000) // 将等待计数器设置为2000
go PutData(numChan) // 启动PutData协程,向numChan通道写入数据
wg3.Wait() // 等待PutData协程执行完成
close(numChan) // 关闭numChan通道
fmt.Println("num:", len(numChan)) // 打印numChan通道内的数据个数(应为2000)
wg4.Add(2000) // 将等待计数器设置为2000
resChan := make(chan map[int]int, 2000) // 创建一个缓冲大小为2000的映射整型到映射整型的通道
for n := range numChan {
go SumUp(n, resChan) // 启动SumUp协程,计算numChan中的每个数字的累加和并存储到resChan通道
}
wg4.Wait() // 等待所有SumUp协程执行完成
close(resChan) // 关闭resChan通道
fmt.Println("res:", len(resChan)) // 打印resChan通道内的数据个数(应为2000)
for res := range resChan {
for key, val := range res {
fmt.Printf("res[%v]=%v\n", key, val) // 遍历输出resChan通道中的结果
}
}
}

// SumUp 计算累加和
func SumUp(n int, resChan chan map[int]int) {
sumMap := make(map[int]int) // 创建一个整型到整型的映射
res := 0 // 初始化res为0
for i := 1; i <= n; i++ {
res += i // 将i累加到res上
}
sumMap[n] = res // 将res存储到sumMap中对应的n位置
resChan <- sumMap // 将sumMap发送到resChan通道
defer wg4.Done() // 在协程结束时,将等待计数器减1
}

func PutData(numChan chan int) {
for i := 1; i <= 2000; i++ {
numChan <- i // 将i发送到numChan通道
wg3.Done() // 等待发送完成后,将等待计数器减1
}
}

三.生产1000个数据保存文件,读取文件排序后另存文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main

import (
"bufio"
"fmt"
"io"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
)

var (
wg5 sync.WaitGroup // wg5用于等待所有goroutine完成
)

// goroutine+channel实现写入文件和排序
func main() {
wg5.Add(1000) // 增加1000个等待的goroutine
go writeDataToFile() // 启动写入数据到文件的goroutine
wg5.Wait() // 等待所有goroutine完成
fmt.Println("文件写入完成!!!!!!!!!!")
fmt.Println()

fmt.Println("读取所写文件对其排序生成新的文件!!!!!!!")
dataChan := make(chan int, 1000) // 创建一个容量为1000的整型通道
readDataToChannel(dataChan) // 启动读取数据到通道的goroutine
sortToSave(dataChan) // 对通道中的数据进行排序和保存
}

// 将数据放入切片 对切片排序 再写入文件保存
func sortToSave(dataChan chan int) {
dataSlice := make([]int, len(dataChan)) // 创建一个与通道长度相同的切片
for i := 0; i < len(dataSlice); i++ {
for data := range dataChan { // 从通道中接收数据
dataSlice[i] = data
break
}
}
fmt.Println(len(dataSlice)) // 打印切片长度, 为1000
QuickSort(dataSlice) // 使用快速排序
fmt.Println("排序后:", dataSlice)

file, err := os.OpenFile("d:/go-test/sortData.txt", os.O_WRONLY|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0666) // 打开或创建文件
if err != nil {
fmt.Println("open file err ", err)
return
}
defer file.Close() // 函数返回前关闭文件
writer := bufio.NewWriter(file) // 创建写入器

for i := 0; i < len(dataSlice); i++ {
_, err := writer.WriteString(strconv.Itoa(dataSlice[i]) + ",") // 将切片中的数据写入文件
if err != nil {
fmt.Println("write file err ", err)
break
}
}
writer.Flush() // 刷新写入器
fmt.Println("有序文件写入完成!!!!!")
}

// 随机生产1000个数据写入文件
func writeDataToFile() {
rand.Seed(time.Now().UnixNano()) // 使用当前时间作为随机数种子
file, err := os.OpenFile("d:/go-test/data.txt", os.O_WRONLY|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0666) // 打开或创建文件
if err != nil {
fmt.Println("open file err ", err)
return
}
defer file.Close() // 函数返回前关闭文件

writer := bufio.NewWriter(file) // 创建写入器

for i := 0; i < 1000; i++ {
_, err := writer.WriteString(strconv.Itoa(rand.Intn(1000)+1) + ",") // 随机生成1000个数据并写入文件
wg5.Done() // wg5计数减1
if err != nil {
fmt.Println("write file err ", err)
break
}
}
writer.Flush() // 刷新写入器
}

// 从文件中读取数据到通道
func readDataToChannel(dataChan chan int) {
file, err := os.Open("d:/go-test/data.txt") // 打开文件
if err != nil {
fmt.Println("open file err ", err)
return
}
defer file.Close() // 函数返回前关闭文件
reader := bufio.NewReader(file) // 创建读取器

for {
dataStr, errRead := reader.ReadString(',') // 从文件中读取数据直到文件末尾或发生错误
if errRead != nil {
if errRead == io.EOF {
fmt.Println("读取结束!!!!")
} else {
fmt.Println("读取错误:", errRead)
}
break
}
atoi, _ := strconv.Atoi(strings.Trim(dataStr, ",")) // 将字符串转换为整数
dataChan <- atoi // 将整数发送到通道
}

close(dataChan) // 在写入所有数据后关闭通道
fmt.Println("长度:", len(dataChan)) // 打印通道长度, 为1000
}

// QuickSort 快速排序
func QuickSort(arr []int) {
// 结束条件
if len(arr) < 2 {
return
}
left, right := 0, len(arr)-1 // 定义分区点的左右指针
pivot := right // 将分区点设置为数组的最后一个元素

for i := 0; i < len(arr); i++ {
if arr[i] < arr[pivot] {
arr[left], arr[i] = arr[i], arr[left] // 将较小的元素交换到左边
left++
}
}

arr[left], arr[right] = arr[right], arr[left] // 将分区点交换到中间
QuickSort(arr[:left]) // 对分区点左边的子数组进行递归排序
QuickSort(arr[left+1:]) // 对分区点右边的子数组进行递归排序
}