Home Backend Development Golang Detailed graphic and text explanation of log collection system implemented in go language

Detailed graphic and text explanation of log collection system implemented in go language

Nov 27, 2019 pm 03:47 PM
go language

Detailed graphic and text explanation of log collection system implemented in go language

I have sorted out the frame of this log collection system, as shown below

Detailed graphic and text explanation of log collection system implemented in go languageThe overall logic of the code to be implemented this time is:

Detailed graphic and text explanation of log collection system implemented in go languageThe complete code address is: https://github.com/pythonsite/logagent

etcd introduction

Highly available distributed Key-value storage can be used for configuration sharing and service discovery

Similar projects: zookeeper and consul

Development language: go

Interface: Provide restful interface, use Simple

Implementation algorithm: strong consistency based on raft algorithm, highly available service storage directory

application scenarios of etcd:

1. Service discovery and service registration

2. Configuration center (need to be used by the log collection client we implemented)

3. Distributed lock

4. Master election

Official website for etcd There is a very concise introduction:

Detailed graphic and text explanation of log collection system implemented in go language

##etcd build:

Download address: https://github.com/coreos/etcd/releases/
Based on Just download the corresponding version in your own environment and start it.

After starting, you can verify it with the following command:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#
Copy after login

context Introduction and use

In fact, this thing is translated as context management. So what is the role of context? It mainly has the following two functions:

1. Control the timeout of goroutine

2 , Save context data

Understand through the following simple example:

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)


type Result struct{
    r *http.Response
    err error
}

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
        fmt.Println("http request failed,err:",err)
        return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
    }()

    select{
    case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return

}

func main() {
    process()
}
Copy after login

Write a context to save context through context, code example is:

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    return a+b
}

func calc(ctx context.Context,a, b int) int{
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
}

func main() {
    //将ctx传递到calc中
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)

}
Copy after login

Combining etcd and context

A simple example of connecting etcd through go: (There is a small problem that needs attention here is the startup method of etcd. The default startup may not connect, especially if you It is installed virtually, so you need to start it with the following command:

./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0: 2371 --listen-peer-urls http://0.0.0.0:2381
)

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}
Copy after login

The following example is to connect etcd, save and get the value

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect succ")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
        fmt.Println("put failed,err",err)
        return
    }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev := range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}
Copy after login

About context The official website also has a very useful example, which is used to control the exit of an opened goroutine. The code is as follows:

package main

import (
    "context"
    "fmt"
)

func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return // returning not to leak the goroutine
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // cancel when we are finished consuming integers

    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            break
        }
    }
}
Copy after login

Regarding the code example of the WithDeadline demonstration in the official website document:

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}
Copy after login

Through the above code, we have A basic use, then if we use etcd for configuration management, if the configuration changes, how do we notify the corresponding server of the configuration change? The following example demonstrates:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    defer cli.Close()
    // 这里会阻塞
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}
Copy after login

Simple implementation of a kafka consumer code Example:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

}
Copy after login

But the above code is not the best code because we end up waiting for the execution of goroutine through time.sleep. We can change it to implement it through sync.WaitGroup

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var (
    wg sync.WaitGroup
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

}
Copy after login

Put the log information that the client needs to collect into etcd

The code for etcd processing is:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string


// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

    Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
    })
    if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
    }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 从etcd中获取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
            logs.Warn("get key %s failed,err:%v",key,err)
            continue
        }

        for _, ev := range resp.Kvs{
            logs.Debug("%q : %q\n",  ev.Key, ev.Value)
            logConfChan <- string(ev.Value)
        }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
    }

    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev:= range wresp.Events{
                    logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
                    logConfChan <- string(ev.Kv.Value)
                }
            default:

            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf()chan string{
    return logConfChan
}
Copy after login

Similarly, add the right limit here After all, the log collection program cannot affect the performance of the current business, so limit.go is added to limit the speed:

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
}

func NewSecondLimit(limit int32) *SecondLimit {
    secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
    }
    return secLimit
}

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait()bool {
    for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond,sec)
            atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
    }
}
Copy after login

Recommended:

go language tutorial

The above is the detailed content of Detailed graphic and text explanation of log collection system implemented in go language. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Roblox: Bubble Gum Simulator Infinity - How To Get And Use Royal Keys
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
Nordhold: Fusion System, Explained
4 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
Mandragora: Whispers Of The Witch Tree - How To Unlock The Grappling Hook
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Hot Topics

Java Tutorial
1670
14
PHP Tutorial
1274
29
C# Tutorial
1256
24
How to solve the user_id type conversion problem when using Redis Stream to implement message queues in Go language? How to solve the user_id type conversion problem when using Redis Stream to implement message queues in Go language? Apr 02, 2025 pm 04:54 PM

The problem of using RedisStream to implement message queues in Go language is using Go language and Redis...

What should I do if the custom structure labels in GoLand are not displayed? What should I do if the custom structure labels in GoLand are not displayed? Apr 02, 2025 pm 05:09 PM

What should I do if the custom structure labels in GoLand are not displayed? When using GoLand for Go language development, many developers will encounter custom structure tags...

What is the problem with Queue thread in Go's crawler Colly? What is the problem with Queue thread in Go's crawler Colly? Apr 02, 2025 pm 02:09 PM

Queue threading problem in Go crawler Colly explores the problem of using the Colly crawler library in Go language, developers often encounter problems with threads and request queues. �...

In Go, why does printing strings with Println and string() functions have different effects? In Go, why does printing strings with Println and string() functions have different effects? Apr 02, 2025 pm 02:03 PM

The difference between string printing in Go language: The difference in the effect of using Println and string() functions is in Go...

What libraries are used for floating point number operations in Go? What libraries are used for floating point number operations in Go? Apr 02, 2025 pm 02:06 PM

The library used for floating-point number operation in Go language introduces how to ensure the accuracy is...

Which libraries in Go are developed by large companies or provided by well-known open source projects? Which libraries in Go are developed by large companies or provided by well-known open source projects? Apr 02, 2025 pm 04:12 PM

Which libraries in Go are developed by large companies or well-known open source projects? When programming in Go, developers often encounter some common needs, ...

What is the difference between `var` and `type` keyword definition structure in Go language? What is the difference between `var` and `type` keyword definition structure in Go language? Apr 02, 2025 pm 12:57 PM

Two ways to define structures in Go language: the difference between var and type keywords. When defining structures, Go language often sees two different ways of writing: First...

When using sql.Open, why does not report an error when DSN passes empty? When using sql.Open, why does not report an error when DSN passes empty? Apr 02, 2025 pm 12:54 PM

When using sql.Open, why doesn’t the DSN report an error? In Go language, sql.Open...

See all articles