Go批量操作excel導(dǎo)入到mongodb的技巧
需求:完成一個(gè)命令工具,批量處理某個(gè)目錄下面的一些excel,將這些excel數(shù)據(jù)導(dǎo)入到mongodb,同時(shí)可以同步到mysql
:: 花了一天時(shí)間寫完代碼,代碼庫位置:https://gitee.com/foz/lib/tree/master/ecc
代碼目錄:
├─cmd | └─ecc.go # 命令 ├─configs ├─data ├─internal │ └─importing # 主要邏輯處理 ├─pkg # 處理文件讀取、連接數(shù)據(jù)庫等 │ ├─files │ ├─mongo │ └─mysql ├─queue └─tools
1. 選擇命令行包
平常使用的的命令工具包有:
- urfave/cli
- spf13/cobra
這里使用的是urfave/cli包,比較簡(jiǎn)單
var DirPath = "../data" // 默認(rèn)位置
var dir = DirPath
app := &cli.App{
Name: "Ecc",
Usage: "Ecc is a tools for batch processing of excel data",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "model",
Aliases: []string{"m"},
Usage: "The model of searching",
Value: "model",
Destination: &model,
},
&cli.StringFlag{ // 設(shè)置一個(gè) -d 的參數(shù),用來確定目標(biāo)文件夾位置
Name: "dir",
Aliases: []string{"d"},
Usage: "Folder location of data files",
Destination: &dir,
Value: DirPath,
},
Action: func(c *cli.Context) error {
importing.Load("../configs/cfg.yaml") // 引入配置文件,讀取mongodb、mysql等配置
importing.Handle(dir) ## 具體邏輯處理
return nil
}2. 讀取配置,連接數(shù)據(jù)庫
讀取配置使用spf13/viper庫,需要讀取一下配置,連接mongodb
var C Config
type Config struct {
Env string `yaml:"env"`
Mongo struct {
DNS string `yaml:"dns"`
Db string `yaml:"db"`
Collection string `yaml:"collection"`
} `yaml:"mongo"`
Mysql struct {
Alias string `yaml:"alias"`
Dns string `yaml:"dns"`
} `yaml:"mysql"`
}
func Load(cf string) {
var err error
viper.SetConfigFile(cf)
if err = viper.ReadInConfig(); err != nil {
log.Fatal(fmt.Errorf("fatal error config file: %s \n", err))
}
if err = viper.Unmarshal(&configs.C); err != nil {
log.Fatal(fmt.Errorf("unmarshal conf failed, err:%s \n", err))
if err = mongo.Conn(configs.C.Mongo.DNS, configs.C.Mongo.Db); err != nil {
log.Fatal(color.RedString("%s:\n%v", "mongo connect err", err))
if mongo.CheckCollection(configs.C.Mongo.Collection) {
if err = mongo.DelCollection(configs.C.Mongo.Collection); err != nil {
log.Fatal(color.RedString("%s:\n%v", "mongo del collection err", err))
}
if err = mongo.CreateCollection(configs.C.Mongo.Collection); err != nil {
log.Fatal(color.RedString("%s:\n%v", "mongo create collection err", err))3. 讀取文件
先確定文件權(quán)限以及文件是否存在
func ReadDir(dir string) ([]os.FileInfo, error) {
perm := checkPermission(dir)
if perm == true {
return nil, fmt.Errorf("permission denied dir: %s", dir)
}
if isNotExistDir(dir) {
return nil, fmt.Errorf("does not exist dir: %s", dir)
files, err := ioutil.ReadDir(dir)
if err == nil {
return files, err
return nil, fmt.Errorf("ReadDir: %s, err: %v", dir, err)
}拿到文件后就要并發(fā)讀取每個(gè)excel文件數(shù)據(jù)
這里需求是一次任務(wù)必須讀完所有的文件,任何一個(gè)文件有錯(cuò)誤就退出程序。
:: 所以需要定義異常退出信道和一個(gè)完成讀取兩個(gè)信道,總的數(shù)據(jù)使用sync.Map安全并發(fā)寫入。
3.1. 并發(fā)讀
rWait = true
rDone = make(chan struct{})
rCrash = make(chan struct{})
read(f, dir, data)
for rWait { // 使用for循環(huán)來阻塞讀文件
select {
case <-rCrash:
abort("-> Failure")
return
case <-rDone:
rWait = false
}
}
func read(fs []os.FileInfo, dir string, data *sync.Map) {
for _, file := range fs {
fileName := file.Name()
_ext := filepath.Ext(fileName)
if Include(strings.Split(Exts, ","), _ext) {
wg.Add(1)
inCh := make(chan File)
go func() {
defer wg.Done()
select {
case <-rCrash:
return // 退出goroutine
case f := <-inCh:
e, preData := ReadExcel(f.FilePath, f.FileName, pb)
if e != nil {
tools.Red("%v", e)
// 使用sync.once防止多個(gè)goroutine關(guān)閉同一個(gè)信道
once.Do(func() {
close(rCrash)
})
return
}
data.Store(f.FileName, preData)
}
}()
inCh <- File{
FileName: fileName,
FilePath: dir + string(os.PathSeparator) + fileName,
}
go func() {
wg.Wait()
close(rDone)
}()3.2. 使用excelize處理excel
excelize是一個(gè)非常好用的excel處理庫,這里使用這個(gè)庫讀取excel文件內(nèi)容
type ExcelPre struct {
FileName string
Data [][]string
Fields []string
Prefixes string
ProgressBar *mpb.Bar // 進(jìn)度條對(duì)象
}
func ReadExcel(filePath, fileName string, pb *mpb.Progress) (err error, pre *ExcelPre) {
f, err := excelize.OpenFile(filePath)
if err != nil {
return err, nil
}
defer func() {
if _e := f.Close(); _e != nil {
fmt.Printf("%s: %v.\n\n", filePath, _e)
}
}()
// 獲取第一頁數(shù)據(jù)
firstSheet := f.WorkBook.Sheets.Sheet[0].Name
rows, err := f.GetRows(firstSheet)
lRows := len(rows)
if lRows < 2 {
lRows = 2
rb := ReadBar(lRows, filePath, pb)
wb := WriteBar(lRows-2, filePath, rb, pb)
var fields []string
var data [][]string
// 進(jìn)度條增加一格
InCr := func(start time.Time) {
rb.Increment()
rb.DecoratorEwmaUpdate(time.Since(start))
for i := 0; i < lRows; i++ {
InCr(time.Now())
// 這里對(duì)第一行處理,用來判斷一些約定的條件
if i == 0 {
fields = rows[i]
for index, field := range fields {
if isChinese := regexp.MustCompile("[\u4e00-\u9fa5]"); isChinese.MatchString(field) || field == "" {
err = errors.New(fmt.Sprintf("%s: line 【A%d】 field 【%s】 \n", filePath, index, field) + "The first line of the file is not a valid attribute name.")
return err, nil
}
}
continue
// 過濾第二行,這一行通常是中文解釋字段
if i == 1 {
data = append(data, rows[i])
return nil, &ExcelPre{
FileName: fileName,
Data: data,
Fields: fields,
Prefixes: Prefix(fileName),
ProgressBar: wb,3.3. 使用mpb在命令行輸出進(jìn)度顯示
mpb是一個(gè)很好用的命令行進(jìn)度輸出庫,上面代碼里里有兩個(gè)進(jìn)度條,一個(gè)是讀進(jìn)度條,第二個(gè)是寫進(jìn)度條,讀進(jìn)度條在文件讀取的時(shí)候就顯示了,返回的結(jié)構(gòu)體里有寫進(jìn)度條對(duì)象,便于后面寫操作時(shí)候顯示。
下面是兩個(gè)進(jìn)度條顯示的配置,具體參數(shù)可以看這個(gè)庫的文檔。
func ReadBar(total int, name string, pb *mpb.Progress) *mpb.Bar {
return pb.AddBar(int64(total),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(color.YellowString("reading"), decor.WCSyncSpaceR), color.YellowString("waiting")),
decor.CountersNoUnit("%d / %d", decor.WCSyncWidth, decor.WCSyncSpaceR),
),
mpb.AppendDecorators(
decor.NewPercentage("%.2f:", decor.WCSyncSpaceR),
decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth),
decor.Name(": "+name),
)
}
func WriteBar(total int, name string, beforeBar *mpb.Bar, pb *mpb.Progress) *mpb.Bar {
mpb.BarQueueAfter(beforeBar, false),
mpb.BarFillerClearOnComplete(),
decor.OnComplete(decor.Name(color.YellowString("writing"), decor.WCSyncSpaceR), color.GreenString("done")),
decor.OnComplete(decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), ""),
decor.OnComplete(decor.NewPercentage("%.2f:", decor.WCSyncSpaceR), ""),
decor.OnComplete(decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth), ""),
decor.OnComplete(decor.Name(": "+name), name),4. 寫入mongodb
同寫入操作,這里拿到所有數(shù)據(jù),然后使用goroutine并發(fā)寫入mongodb,在處理數(shù)據(jù)時(shí)候需要查重,還需要記錄一下本次操作插入了哪些數(shù)據(jù)的_id值,在報(bào)錯(cuò)的時(shí)候進(jìn)行刪除(這里可以使用事務(wù),直接刪除簡(jiǎn)單些),所以定義了一個(gè)Shuttle結(jié)構(gòu)體用來在記錄并發(fā)時(shí)的數(shù)據(jù)。
wWait = true
wDone = make(chan struct{})
wCrash = make(chan struct{})
type Shuttle struct {
Hid []string // 用來判斷是否是重復(fù)數(shù)據(jù)
Mid []string // 用來記錄本次插入的數(shù)據(jù)_id
mu sync.Mutex
}
func (s *Shuttle) Append(t string, str string) {
s.mu.Lock()
defer s.mu.Unlock()
switch t {
case "h":
s.Hid = append(s.Hid, str)
case "m":
s.Mid = append(s.Mid, str)
}
write2mongo(data)
for wWait {
select {
case <-wCrash:
abort("-> Failure")
return
case <-wDone:
wWait = false
func write2mongo(data *sync.Map) {
collection := mongo.GetCollection(configs.C.Mongo.Collection)
data.Range(func(key, value interface{}) bool {
if v, ok := value.(*ExcelPre); ok {
wg.Add(1)
inCh := make(chan []bson.M)
go func() {
defer wg.Done()
select {
case <-wCrash:
return // exit
case rows := <-inCh:
e := Write2Mongo(rows, collection, v, &shuttle)
if e != nil {
tools.Red("%v", e)
once.Do(func() {
close(wCrash)
})
return
}
}
}()
inCh <- PreWrite(v)
}
return true
})
go func() {
wg.Wait()
close(wDone)
}()
// 具體處理邏輯
func Write2Mongo(rows []bson.M, collection *mongoDb.Collection, v *ExcelPre, s *Shuttle) error {
v.ProgressBar.SetCurrent(0)
incr := func(t time.Time, b *mpb.Bar, n int64) {
b.IncrInt64(n)
b.DecoratorEwmaUpdate(time.Since(t))
for _, row := range rows {
start := time.Now()
key := v.Prefixes + "@@" + row["_hid"].(string)
s.mu.Lock()
if Include(s.Hid, key) {
s.mu.Unlock()
incr(start, v.ProgressBar, 1)
continue
} else {
s.Hid = append(s.Hid, key)
var err error
var id primitive.ObjectID
if id, err = mongo.CreateDocs(collection, row); err != nil {
return errors.New(fmt.Sprintf("%s:\n%v", "mongo create docs err", err))
s.Append("m", id.Hex())
incr(start, v.ProgressBar, 1)
return nil5. 同步mysql
因?yàn)橥絤ysql不是必要的,這里使用命令行輸入進(jìn)行判斷:
tools.Yellow("-> Whether to sync data to mysql? (y/n)")
if !tools.Scan("aborted") {
return
} else {
tools.Yellow("-> Syncing data to mysql...")
if err = write2mysql(); err != nil {
tools.Red("-> Failure:" + err.Error())
} else {
tools.Green("-> Success.")
}
}連接mysql數(shù)據(jù)庫,拿到當(dāng)前monogodb的數(shù)據(jù):
func write2mysql() error {
if err := mysql.Conn(configs.C.Mysql.Dns); err != nil {
return err
}
d, err := mongo.GetCollectionAllData(configs.C.Mongo.Collection)
if err != nil {
err = Write2Mysql(d)
return err
}創(chuàng)建表,直接拼sql就完事了:
func CreateTable(tableName string, fields []string) error {
var err error
delSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableName)
err = Db.Exec(delSql).Error
if err != nil {
return err
}
s := "id bigint(20) NOT NULL PRIMARY KEY"
for _, field := range fields {
s += fmt.Sprintf(",%s varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL", field)
sql := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci", tableName, s)
err = Db.Exec(sql).Error
return nil
}插入數(shù)據(jù),bson.M本身就是一個(gè)map,轉(zhuǎn)一下使用gorm分批插入數(shù)據(jù),速度快一點(diǎn):
func InsertData(tableName string, fields []string, data []bson.M) error {
var err error
var maps []map[string]interface{}
for _, d := range data {
row := make(map[string]interface{})
for _, field := range fields {
row[field] = d[field]
}
if row != nil {
row["id"] = d["id"].(string)
maps = append(maps, row)
}
if len(maps) > 0 {
err = Db.Table(tableName).CreateInBatches(maps, 100).Error
if err != nil {
return err
return err
}6. 總結(jié)
做為golang新手,看了很多文檔、文章,好似懂了,其實(shí)啥都不懂。
到此這篇關(guān)于Go批量操作excel導(dǎo)入到mongo的文章就介紹到這了,更多相關(guān)Go excel導(dǎo)入mongo內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言中io.Reader和io.Writer的詳解與實(shí)現(xiàn)
在Go語言的實(shí)際編程中,幾乎所有的數(shù)據(jù)結(jié)構(gòu)都圍繞接口展開,接口是Go語言中所有數(shù)據(jù)結(jié)構(gòu)的核心。在使用Go語言的過程中,無論你是實(shí)現(xiàn)web應(yīng)用程序,還是控制臺(tái)輸入輸出,又或者是網(wǎng)絡(luò)操作,不可避免的會(huì)遇到IO操作,使用到io.Reader和io.Writer接口。下面來詳細(xì)看看。2016-09-09
go程序測(cè)試CPU占用率統(tǒng)計(jì)ps?vs?top兩種不同方式對(duì)比
這篇文章主要為大家介紹了go程序測(cè)試CPU占用率統(tǒng)計(jì)ps?vs?top兩種不同方式對(duì)比,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
Golang實(shí)現(xiàn)http server提供壓縮文件下載功能
這篇文章主要介紹了Golang實(shí)現(xiàn)http server提供壓縮文件下載功能,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01

