• Go批量处理代码还能这样写?
  • 发布于 4天前
  • 53 热度
    13 评论
看到公司其他同学写的 go 批量处理代码,这风骚的感觉像在平行世界一样,还能这么玩?这里是一些脚本调用的地方,工具源码放在后面两个代码块了。
util.TaskConsumer[[]string](10).
SetP(lineopt.IterExcel2("xxx.xlsx")).
SetC(func(index int, row []string) (err error) {
if index == 1 {
return
}
// .....
// 这里是逻辑处理函数
return
}).
Run()
这是两个封装的函数的源码。
package lineopt
import("bufio"
    "fmt"
    "github.com/xuri/excelize/v2"
    "iter"
    "log/slog"
    "os")
func IterLine2(filePath string) iter.Seq2[int, string]
{
    return func(
        yield func(int, string) bool)
    {
        f, errF: = os.OpenFile(filePath, os.O_RDONLY, 0666)
        if errF != nil
        {
            return
        }
        defer func(f * os.File)
        {
            err: = f.Close()
            if err != nil
            {
                fmt.Println(err)
            }
        }(f)
        scanner: = bufio.NewScanner(f)
        index: = 1
        for scanner.Scan()
        {
            line: = scanner.Text()
            if !
            yield(index, line)
            {
                return
            }
            index += 1
        }
    }
}
func IterLine(filePath string) iter.Seq[string]
{
    return func(
        yield func(string) bool)
    {
        for _, item: = range IterLine2(filePath)
        {
            if !
            yield(item)
            {
                return
            }
        }
    }
}
func MapIterExcel2(config ExcelTarget) iter.Seq2[int, [] string]
{
    return func(
        yield func(int, [] string) bool)
    {
        f, err: = excelize.OpenFile(config.FilePath)
        if err != nil
        {
            slog.Error(err.Error())
            return
        }
        defer f.Close()
        targetSheet: = config.TargetSheet
        if targetSheet == ""
        {
            targetSheet = f.GetSheetName(0)
        }
        rows, err: = f.Rows(targetSheet)
        if err != nil
        {
            slog.Error(err.Error())
            return
        }
        index: = 1
        for rows.Next()
        {
            row, err: = rows.Columns()
            if err != nil
            {
                slog.Error(err.Error())
                return
            }
            if !
            yield(index, row)
            {
                return
            }
            index += 1
        }
        return
    }
}
func MapIterExcel(config ExcelTarget) iter.Seq[[] string]
{
    return func(
        yield func([] string) bool)
    {
        for _, value: = range MapIterExcel2(config)
        {
            if !
            yield(value)
            {
                return
            }
        }
    }
}
func IterExcel2(filePath string) iter.Seq2[int, [] string]
{
    return func(
        yield func(int, [] string) bool)
    {
        for index, value: = range MapIterExcel2(ExcelTarget
        {
            FilePath: filePath
        })
        {
            if !
            yield(index, value)
            {
                return
            }
        }
    }
}
func IterExcel(filePath string) iter.Seq[[] string]
{
    return func(
        yield func([] string) bool)
    {
        for _, value: = range MapIterExcel2(ExcelTarget
        {
            FilePath: filePath
        })
        {
            if !
            yield(value)
            {
                return
            }
        }
    }
}
func IterExcelSheet2(filePath string, sheetName string) iter.Seq2[int, [] string]
{
    return func(
        yield func(int, [] string) bool)
    {
        for index, value: = range MapIterExcel2(ExcelTarget
        {
            FilePath: filePath,
            TargetSheet: sheetName,
        })
        {
            if !
            yield(index, value)
            {
                return
            }
        }
    }
}
func IterExcelSheet(filePath string, sheetName string) iter.Seq[[] string]
{
    return func(
        yield func([] string) bool)
    {
        for _, value: = range MapIterExcel2(ExcelTarget
        {
            FilePath: filePath,
            TargetSheet: sheetName,
        })
        {
            if !
            yield(value)
            {
                return
            }
        }
    }
}
package util
import("dt/app/util/lineopt"
    "errors"
    "iter"
    "sync")
func ChannelConsume[d any](queue chan d, job func(item d), number...int) * sync.WaitGroup
    {
        counter: = 10
        if len(number) == 1 && number[0] > 0
        {
            counter = number[0]
        }
        return StartTogether(func()
        {
            for item: = range queue
            {
                job(item)
            }
        }, counter)
    }
    // Together 并行执行
func Together(job func(), counter int)
{
    var wg sync.WaitGroup
    for i: = 1;
    i <= counter;
    i++
    {
        wg.Add(1)
        go func()
        {
            defer wg.Done()
            job()
        }()
    }
    wg.Wait()
}
func StartTogether(job func(), counter int) * sync.WaitGroup
{
    var wg sync.WaitGroup
    for i: = 1;
    i <= counter;
    i++
    {
        wg.Add(1)
        go func()
        {
            defer wg.Done()
            job()
        }()
    }
    return &wg
}
type chanData[d any] struct
{
    index int
    data d
}
func ChannelConsume2[d any](queue chan chanData[d], job func(index int, item d)(err error), number...int) * sync.WaitGroup
{
    counter: = 10
    if len(number) == 1 && number[0] > 0
    {
        counter = number[0]
    }
    return StartTogether(func()
    {
        for item: = range queue
        {
            err: = job(item.index, item.data)
            if errors.Is(err, lineopt.Stop)
            {
                // 目前不可以直接停止,会导致消费者阻塞掉
                //return
            }
        }
    }, counter)
}
type ProducerConsumer[T any] struct
{
    consumerNumber int
    queue chan chanData[T]
    p iter.Seq2[int, T]
    c func(index int, item T)(err error)
    once sync.Once
}
func(itself * ProducerConsumer[T]) SetC(c func(index int, item T)(err error)) * ProducerConsumer[T]
{
    itself.c = c
    return itself
}
func(itself * ProducerConsumer[T]) SetP(p iter.Seq2[int, T]) * ProducerConsumer[T]
    {
        itself.p = p
        return itself
    }
    // 生产者消费者都有可能发生阻塞,
    // 生产者阻塞的原因是因为 queue 容量不够了
    // 消费者阻塞的原因的是因为 queue 没有 close
    // 生产者只需要实现即可
func(itself * ProducerConsumer[T]) do()
    {
        task: = ChannelConsume2(itself.queue, func(index int, item T)(err error)
        {
            return itself.c(index, item)
        }, itself.consumerNumber)
        defer task.Wait()
        defer close(itself.queue)
        for index,
        v: = range itself.p
        {
            select
            {
                case itself.queue < -chanData[T]
                {
                    index,
                    v,
                }:
                    break
                    // 需要一个可以知道提前截止的操作
            }
        }
    }
func(itself * ProducerConsumer[T]) Run()
{
    itself.once.Do(func()
    {
        itself.do()
    })
}
func TaskConsumer[T any](consumerNumber...int) * ProducerConsumer[T]
{
    n: = 1
    if len(consumerNumber) > 0
    {
        n = consumerNumber[0]
    }
    return &ProducerConsumer[T]
    {
        queue: make(chan chanData[T], n),
        consumerNumber: n,
    }
}

用户评论
  • 柠檬酸
  • 这种风骚属于脱离生态后不得已而为之,带有强迫的属性,所以是其他「受害者」。
    上游先有一个预处理器把 excel 干掉,剩下的事情就好办很多……
  • 2025/7/27 11:22:00 [ 0 ] [ 0 ] 回复
  • 原木风
  • 李明发  2025-07-27 11:15
    用了 Go 最新的 iterator 搞出了一些偏函数式风格的东西。似乎是为了方便并发处理
    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方

    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方

    或许没人管,或许也不是多好的代码,但是要有职业素养...


  • 2025/7/27 11:16:00 [ 0 ] [ 0 ] 回复
  • 李明发
  • 用了 Go 最新的 iterator 搞出了一些偏函数式风格的东西。似乎是为了方便并发处理
    P.S. 如果这是公司的代码,一般公司都不会允许随便发布到公开的地方
  • 2025/7/27 11:15:00 [ 0 ] [ 0 ] 回复