1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
import (
"encoding/binary"
"io"
"math/rand"
"sort"
)
// empty file:不包括main(),用于他方调用的库编码
// 接收一个int数组,扔进channel,返回
func ArraySource(srcArr ...int) chan int {
// 创建channel
out := make(chan int)
// channel内容只能通过goroutine进行传送
go func() {
for _, v := range srcArr {
// 数组内容扔进channel
out <- v
}
// 关闭通道
close(out)
}()
return out
}
// 增加<- 表示只进不出
func ArraySource2(srcArr ...int) <-chan int {
out := make(chan int)
go func() {
for _, v := range srcArr {
out <- v
}
close(out)
}()
return out
}
// 内存排序
func InMemorySort(in <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
arr := []int{}
// 输入的chan,读取内容到slice中
for v := range in {
arr = append(arr, v)
}
// 排序
sort.Ints(arr)
// 扔进chan
for _, v := range arr {
out <- v
}
close(out)
}()
return out
}
// 合并节点,归并操作
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
// 接收方:ok判定有无元素
for ok1 || ok2 {
// in2无值取in1
// in1中有值,且小于in2中的值
if !ok2 || (ok1 && v1 < v2) {
// 将v1扔进out
out <- v1
// 继续读in1
v1, ok1 = <-in1
} else {
// 将v2扔进out
out <- v2
// 继续读in2
v2, ok2 = <-in2
}
}
// 发送方:channel关闭
close(out)
}()
return out
}
// 归并多个节点
func MergeN(inputs ...<-chan int) <-chan int {
size := len(inputs)
if size == 1 {
return inputs[0]
}
middle := size / 2
return Merge(MergeN(inputs[:middle]...), MergeN(inputs[middle:]...))
}
// 文件系统读取源数据
func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 1024)
go func() {
// 指定八位
buffer := make([]byte, 8)
// 已读的字节数
bytesRead := 0
for {
// 读到的字节数
n, err := reader.Read(buffer)
bytesRead += n
if n > 0 {
// 转换文件字节到channel
out <- int(binary.BigEndian.Uint64(buffer))
}
// 处理错误
// 超出了预设的块大小
exceedChunk := chunkSize != -1 && chunkSize <= bytesRead
if err != nil || exceedChunk {
break
}
}
close(out)
}()
return out
}
// 写数据
func WriterSink(writer io.Writer, in <-chan int) {
for v := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
_, err := writer.Write(buffer)
if err != nil {
panic(err)
}
}
}
// 生成随机数据源
func RandomSource(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}
|