看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?这里是一些脚本调用的地方,工具源码放在后面两个代码块了。
util.TaskConsumer[[]string](10).
SetP(lineopt.IterExcel2("xxx.xlsx")).
SetC(func(index int, row []string) (err error) {
if index == 1 {
return
}
// .....
// 这里是逻辑处理函数
return
}).
Run()
这是两个封装的函数的源码。
package lineopt
import("bufio"
"fmt"
"github.com/xuri/excelize/v2"
"iter"
"log/slog"
"os")
func IterLine2(filePath string) iter.Seq2[int, string]
{
return func(
yield func(int, string) bool)
{
f, errF: = os.OpenFile(filePath, os.O_RDONLY, 0666)
if errF != nil
{
return
}
defer func(f * os.File)
{
err: = f.Close()
if err != nil
{
fmt.Println(err)
}
}(f)
scanner: = bufio.NewScanner(f)
index: = 1
for scanner.Scan()
{
line: = scanner.Text()
if !
yield(index, line)
{
return
}
index += 1
}
}
}
func IterLine(filePath string) iter.Seq[string]
{
return func(
yield func(string) bool)
{
for _, item: = range IterLine2(filePath)
{
if !
yield(item)
{
return
}
}
}
}
func MapIterExcel2(config ExcelTarget) iter.Seq2[int, [] string]
{
return func(
yield func(int, [] string) bool)
{
f, err: = excelize.OpenFile(config.FilePath)
if err != nil
{
slog.Error(err.Error())
return
}
defer f.Close()
targetSheet: = config.TargetSheet
if targetSheet == ""
{
targetSheet = f.GetSheetName(0)
}
rows, err: = f.Rows(targetSheet)
if err != nil
{
slog.Error(err.Error())
return
}
index: = 1
for rows.Next()
{
row, err: = rows.Columns()
if err != nil
{
slog.Error(err.Error())
return
}
if !
yield(index, row)
{
return
}
index += 1
}
return
}
}
func MapIterExcel(config ExcelTarget) iter.Seq[[] string]
{
return func(
yield func([] string) bool)
{
for _, value: = range MapIterExcel2(config)
{
if !
yield(value)
{
return
}
}
}
}
func IterExcel2(filePath string) iter.Seq2[int, [] string]
{
return func(
yield func(int, [] string) bool)
{
for index, value: = range MapIterExcel2(ExcelTarget
{
FilePath: filePath
})
{
if !
yield(index, value)
{
return
}
}
}
}
func IterExcel(filePath string) iter.Seq[[] string]
{
return func(
yield func([] string) bool)
{
for _, value: = range MapIterExcel2(ExcelTarget
{
FilePath: filePath
})
{
if !
yield(value)
{
return
}
}
}
}
func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, [] string]
{
return func(
yield func(int, [] string) bool)
{
for index, value: = range MapIterExcel2(ExcelTarget
{
FilePath: filePath,
TargetSheet: sheetName,
})
{
if !
yield(index, value)
{
return
}
}
}
}
func IterExcelSheet(filePath string, sheetName string) iter.Seq[[] string]
{
return func(
yield func([] string) bool)
{
for _, value: = range MapIterExcel2(ExcelTarget
{
FilePath: filePath,
TargetSheet: sheetName,
})
{
if !
yield(value)
{
return
}
}
}
}
package util
import("dt/app/util/lineopt"
"errors"
"iter"
"sync")
func ChannelConsume[d any](queue chan d, job func(item d), number...int) * sync.WaitGroup
{
counter: = 10
if len(number) == 1 && number[0] > 0
{
counter = number[0]
}
return StartTogether(func()
{
for item: = range queue
{
job(item)
}
}, counter)
}
// Together 并行执行
func Together(job func(), counter int)
{
var wg sync.WaitGroup
for i: = 1;
i <= counter;
i++
{
wg.Add(1)
go func()
{
defer wg.Done()
job()
}()
}
wg.Wait()
}
func StartTogether(job func(), counter int) * sync.WaitGroup
{
var wg sync.WaitGroup
for i: = 1;
i <= counter;
i++
{
wg.Add(1)
go func()
{
defer wg.Done()
job()
}()
}
return &wg
}
type chanData[d any] struct
{
index int
data d
}
func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d)(err error), number...int) * sync.WaitGroup
{
counter: = 10
if len(number) == 1 && number[0] > 0
{
counter = number[0]
}
return StartTogether(func()
{
for item: = range queue
{
err: = job(item.index, item.data)
if errors.Is(err, lineopt.Stop)
{
// 目前不可以直接停止,会导致消费者阻塞掉
//return
}
}
}, counter)
}
type ProducerConsumer[T any] struct
{
consumerNumber int
queue chan chanData[T]
p iter.Seq2[int, T]
c func(index int, item T)(err error)
once sync.Once
}
func(itself * ProducerConsumer[T]) SetC(c func(index int, item T)(err error)) * ProducerConsumer[T]
{
itself.c = c
return itself
}
func(itself * ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) * ProducerConsumer[T]
{
itself.p = p
return itself
}
// 生产者消费者都有可能发生阻塞,
// 生产者阻塞的原因是因为 queue 容量不够了
// 消费者阻塞的原因的是因为 queue 没有 close
// 生产者只需要实现即可
func(itself * ProducerConsumer[T]) do()
{
task: = ChannelConsume2(itself.queue, func(index int, item T)(err error)
{
return itself.c(index, item)
}, itself.consumerNumber)
defer task.Wait()
defer close(itself.queue)
for index,
v: = range itself.p
{
select
{
case itself.queue < -chanData[T]
{
index,
v,
}:
break
// 需要一个可以知道提前截止的操作
}
}
}
func(itself * ProducerConsumer[T]) Run()
{
itself.once.Do(func()
{
itself.do()
})
}
func TaskConsumer[T any](consumerNumber...int) * ProducerConsumer[T]
{
n: = 1
if len(consumerNumber) > 0
{
n = consumerNumber[0]
}
return &ProducerConsumer[T]
{
queue: make(chan chanData[T], n),
consumerNumber: n,
}
}
上游先有一个预处理器把 excel 干掉,剩下的事情就好办很多……
P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方
或许没人管,或许也不是多好的代码,但是要有职业素养...
P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方