Golang監(jiān)聽日志文件并發(fā)送到kafka中
前言
日志收集項(xiàng)目的準(zhǔn)備中,本文主要講的是利用golang的tail庫(kù),監(jiān)聽日志文件的變動(dòng),將日志信息發(fā)送到kafka中。
涉及的golang庫(kù)和可視化工具:
go-ini,sarama,tail其中:
go-ini:用于讀取配置文件,統(tǒng)一管理配置項(xiàng),有利于后其的維護(hù)sarama:是一個(gè)go操作kafka的客戶端。目前我用于向kefka發(fā)送消息tail:類似于linux的tail命令了,讀取文件的后幾行。如果文件有追加數(shù)據(jù),會(huì)檢測(cè)到。就是通過它來監(jiān)聽日志文件
可視化工具:
offsetexplorer:是kafka的可視化工具,這里用來查看消息是否投遞成功
工作的流程
- 加載配置,初始化
sarama和kafka。 - 起一個(gè)的協(xié)程,利用
tail不斷去監(jiān)聽日志文件的變化。 - 主協(xié)程中一直阻塞等待
tail發(fā)送消息,兩者通過一個(gè)管道通訊。一旦主協(xié)程接收到新日志,組裝格式,然后發(fā)送到kafka中

環(huán)境準(zhǔn)備
環(huán)境的話,確保zookeeper和kafka正常運(yùn)行。因?yàn)檫€沒有使用sarama讀取數(shù)據(jù),使用offsetexplorer來查看任務(wù)是否真的投遞成功了。
代碼分層
serve來存放寫tail服務(wù)類和sarama服務(wù)類,conf存放ini配置文件
main函數(shù)為程序入口

關(guān)鍵的代碼
main.go
main函數(shù)做的有:構(gòu)建配置結(jié)構(gòu)體,映射配置文件。調(diào)用和初始化tail,srama服務(wù)。
package main
import (
"fmt"
"sarama/serve"
"github.com/go-ini/ini"
)
type KafkaConfig struct {
Address string `ini:"address"`
ChannelSize int `ini:"chan_size"`
}
type TailConfig struct {
Path string `ini:"path"`
Filename string `ini:"fileName"`
// 如果是結(jié)構(gòu)體,則指明分區(qū)名
Children `ini:"tailfile.children"`
}
type Config struct {
KafkaConfig `ini:"kafka"`
TailConfig `ini:"tailfile"`
}
type Children struct {
Name string `ini:"name"`
}
func main() {
// 加載配置
var cfg = new(Config)
err := ini.MapTo(cfg, "./conf/go-conf.ini")
if err != nil {
fmt.Print(err)
}
// 初始化kafka
ks := &serve.KafukaServe{}
// 啟動(dòng)kafka消息監(jiān)聽。異步
ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))
// 關(guān)閉主協(xié)程時(shí),關(guān)閉channel
defer ks.Destruct()
// 初始化tail
ts := &serve.TailServe{}
ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)
// 阻塞
ts.Listener(ks.MsgChan)
}
kafka.go
有3個(gè)方法 :
InitKafka,組裝配置項(xiàng)以及初始化接收消息的管道,Listener,監(jiān)聽管道消息,收到消息后,將消息組裝,發(fā)送到kafkaDestruct, 關(guān)閉管道
package serve
import (
"fmt"
"github.com/Shopify/sarama"
)
type KafukaServe struct {
MsgChan chan string
//err error
}
func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {
// 讀取配置
config := sarama.NewConfig()
// 1. 初始化生產(chǎn)者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 選擇分區(qū)
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
ks.MsgChan = make(chan string, chanSize)
go ks.Listener(addr, chanSize, config)
}
func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {
// 連接kafka
var kafkaClient, _ = sarama.NewSyncProducer(addr, config)
defer kafkaClient.Close()
for {
select {
case content := <-ks.MsgChan:
//
msg := &sarama.ProducerMessage{
Topic: "weblog",
Value: sarama.StringEncoder(content),
}
partition, offset, err := kafkaClient.SendMessage(msg)
if err != nil {
fmt.Println(err)
}
fmt.Println("分區(qū),偏移量:")
fmt.Println(partition, offset)
fmt.Println("___")
}
}
}
func (ks *KafukaServe) Destruct() {
close(ks.MsgChan)
}
tail.go
主要包括了兩個(gè)方法:
TailInit初始化,組裝tail配置。ListenerListener,保存kafka服務(wù)類初始化之后的管道。監(jiān)聽日志文件,如果有新日志,就往管道里發(fā)送
package serve
import (
"fmt"
"github.com/hpcloud/tail"
)
type TailServe struct {
tails *tail.Tail
}
func (ts *TailServe) TailInit(filenName string) {
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
// 打開文件開始讀取數(shù)據(jù)
ts.tails, _ = tail.TailFile(filenName, config)
// if err != nil {
// fmt.Println("tails %s failed,err:%v\n", filenName, err)
// return nil, err
// }
fmt.Println("啟動(dòng)," + filenName + "監(jiān)聽")
}
func (ts *TailServe) Listener(MsgChan chan string) {
for {
msg, ok := <-ts.tails.Lines
if !ok {
// todo
fmt.Println("數(shù)據(jù)接收失敗")
return
}
fmt.Println(msg.Text)
MsgChan <- msg.Text
}
}
// 測(cè)試案例
func Demo() {
filename := `E:\xx.log`
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
// 打開文件開始讀取數(shù)據(jù)
tails, err := tail.TailFile(filename, config)
if err != nil {
fmt.Println("tails %s failed,err:%v\n", filename, err)
return
}
var (
msg *tail.Line
ok bool
)
fmt.Println("啟動(dòng)")
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Println("tails file close reopen,filename:$s\n", tails.Filename)
}
fmt.Println("msg:", msg.Text)
}
}
到此這篇關(guān)于Golang監(jiān)聽日志文件并發(fā)送到kafka中的文章就介紹到這了,更多相關(guān)Golang 監(jiān)聽日志文件 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
VsCode搭建Go語言開發(fā)環(huán)境的配置教程
這篇文章主要介紹了在VsCode中搭建Go開發(fā)環(huán)境的配置教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
Go語言非main包編譯為靜態(tài)庫(kù)并使用的示例代碼
本文以Windows為例,介紹一下如何將Go的非main包編譯為靜態(tài)庫(kù),用戶又將如何使用。通過實(shí)際項(xiàng)目創(chuàng)建常規(guī)工程,通過示例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-07-07
Golang之sync.Pool對(duì)象池對(duì)象重用機(jī)制總結(jié)
這篇文章主要對(duì)Golang的sync.Pool對(duì)象池對(duì)象重用機(jī)制做了一個(gè)總結(jié),文中有相關(guān)的代碼示例和圖解,具有一定的參考價(jià)值,需要的朋友可以參考下2023-07-07
golang高性能的http請(qǐng)求 fasthttp詳解
fasthttp 是 Go 的快速 HTTP 實(shí)現(xiàn),當(dāng)前在 1M 并發(fā)的生產(chǎn)環(huán)境使用非常成功,可以從單個(gè)服務(wù)器進(jìn)行 100K qps 的持續(xù)連接,總而言之,fasthttp 比 net/http 快 10 倍,下面通過本文給大家介紹golang fasthttp http請(qǐng)求的相關(guān)知識(shí),一起看看吧2021-09-09

