How to use Redis in Golang distributed applications
Text
Redis is a high-performance in-memory database that is often used in distributed systems. In addition to being a distributed cache or a simple memory The database also has some special application scenarios. This article combines Golang to write the corresponding middleware.
Distributed lock
In a stand-alone system, we can use sync.Mutex
to protect critical resources. There is also such a need in a distributed system. When multiple hosts To seize the same resource, you need to add the corresponding "distributed lock".
In Redis we can achieve this through the setnx
command
If the key does not exist, the corresponding value can be set. If the setting is successful, the lock will be successful. , key does not exist and returns failure
Releasing the lock can be achieved through
del
.
The main logic is as follows:
type RedisLock struct { client *redis.Client key string expiration time.Duration // 过期时间,防止宕机或者异常 } func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock { return &RedisLock{ client: client, key: key, expiration: expiration, } } // 加锁将成功会将调用者id保存到redis中 func (l *RedisLock) Lock(id string) (bool, error) { return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result() } const unLockScript = ` if (redis.call("get", KEYS[1]) == KEYS[2]) then redis.call("del", KEYS[1]) return true end return false ` // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁 func (l *RedisLock) UnLock(id string) error { _, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result() if err != nil && err != redis.Nil { return err } return nil }
In order to prevent system downtime or deadlock caused by abnormal requests, an additional timeout period needs to be added, which should be set to Twice the maximum estimated run time.
Lua script is used to ensure atomicity when unlocking. The caller will only unlock the lock added by itself. Avoid confusion caused by timeout. For example: Process A acquired the lock at time t1, but due to slow execution, the lock timed out at time t2. Process B acquired the lock at t3. If process A finishes executing and unlocks the process, the process will be canceled. B's lock.
Run the test
func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) lock := NewLock(client, "counter", 30*time.Second) counter := 0 worker := func(i int) { for { id := fmt.Sprintf("worker%d", i) ok, err := lock.Lock(id) log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.Sleep(100 * time.Millisecond) continue } defer lock.UnLock(id) counter++ log.Printf("worker %d, add counter %d", i, counter) break } } wg := sync.WaitGroup{} for i := 1; i <= 5; i++ { wg.Add(1) id := i go func() { defer wg.Done() worker(id) }() } wg.Wait() }
The running result shows that the effect is similar to sync.Mutex
2022/07/22 09: 58:09 worker 5 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58 :09 worker 4 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok : false, err:
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58 :10 worker 1 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false , err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 3, add counter 5
Special attention is that, in In a distributed Redis cluster, if an exception occurs (the master node is down), the availability of distributed locks may be reduced, which can be achieved through strong consistency components such as etcd and ZooKeeper.
Distributed filter
Suppose we want to develop a crawler service to crawl millions of web pages. How to judge whether a certain web page has been crawled? In addition to using the database and HashMap, we can use Bloom filter is used to do it. Relative to other methods, Bloom filters take up very little space and have very fast insertion and query times.
The Bloom filter is used to determine whether an element is in the set, using BitSet
When inserting data, the value is hashed multiple times, and the BitSet corresponding position is 1
When querying, Hash is also performed multiple times to compare whether all bits are 1. If so, it exists.
The Bloom filter has a certain misjudgment rate and is not suitable for precise query scenarios. In addition, deleting elements is not supported. It is usually used in scenarios such as URL deduplication, spam filtering, and cache breakdown prevention.
In Redis, we can use the built-in BitSet implementation, and also use the atomicity of lua scripts to avoid multiple query data inconsistencies.
const ( // 插入数据,调用setbit设置对应位 setScript = ` for _, offset in ipairs(ARGV) do redis.call("setbit", KEYS[1], offset, 1) end ` // 查询数据,如果所有位都为1返回true getScript = ` for _, offset in ipairs(ARGV) do if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then return false end end return true ` ) type BloomFilter struct { client *redis.Client key string // 存在redis中的key bits uint // BitSet的大小 maps uint // Hash的次数 } func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter { client.Del(context.TODO(), key) if maps == 0 { maps = 14 } return &BloomFilter{ key: key, client: client, bits: bits, maps: maps, } } // 进行多次Hash, 得到位置列表 func (f *BloomFilter) getLocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.Sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations } func (f *BloomFilter) Add(data []byte) error { args := getArgs(f.getLocations(data)) _, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result() if err != nil && err != redis.Nil { return err } return nil } func (f *BloomFilter) Exists(data []byte) (bool, error) { args := getArgs(f.getLocations(data)) resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result() if err != nil { if err == redis.Nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil } func getArgs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.FormatUint(uint64(l), 10)) } return args }
Run the test
func main() { bf := NewBloomFilter(client,"bf-test", 2^16, 14) exists, err := bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) if err := bf.Add([]byte("test1")); err != nil { log.Printf("add err: %v", err) } exists, err = bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) exists, err = bf.Exists([]byte("test2")) log.Printf("exist %t, err %v", exists, err) // output // 2022/07/22 10:05:58 exist false, err <nil> // 2022/07/22 10:05:58 exist true, err <nil> // 2022/07/22 10:05:58 exist false, err <nil> }
Distributed rate limiter
Token-based is provided in the golang.org/x/time/rate
package Bucket current limiter, if you want to implement current limiting in a distributed environment, you can implement it based on the Redis Lua script.
The main principle of the token bucket is as follows:
Assume that the capacity of a token bucket is burst, and tokens are placed in it at the rate of qps per second
Initially, the tokens are filled. If the tokens overflow, they will be discarded directly. When requesting tokens, if there are enough tokens in the bucket, it will be allowed, otherwise it will be rejected.
When burst==qps, strictly follow the qps flow limit; when burst>qps, a certain burst of traffic can be allowed
这里主要参考了官方rate
包的实现,将核心逻辑改为Lua实现。
--- 相关Key --- limit rate key值,对应value为当前令牌数 local limit_key = KEYS[1] --- 输入参数 --[[ qps: 每秒请求数; burst: 令牌桶容量; now: 当前Timestamp; cost: 请求令牌数; max_wait: 最大等待时间 --]] local qps = tonumber(ARGV[1]) local burst = tonumber(ARGV[2]) local now = ARGV[3] local cost = tonumber(ARGV[4]) local max_wait = tonumber(ARGV[5]) --- 获取redis中的令牌数 local tokens = redis.call("hget", limit_key, "token") if not tokens then tokens = burst end --- 上次修改时间 local last_time = redis.call("hget", limit_key, "last_time") if not last_time then last_time = 0 end --- 最新等待时间 local last_event = redis.call("hget", limit_key, "last_event") if not last_event then last_event = 0 end --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数 local delta = math.max(0, now-last_time) local new_tokens = math.min(burst, delta * qps + tokens) new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌 --- 如果最新令牌数小于0,计算需要等待的时间 local wait_period = 0 if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qps end wait_period = math.ceil(wait_period) local time_act = now + wait_period --- 满足等待间隔的时间戳 --- 允许请求有两种情况 --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求 --- qps为0时,只要最新令牌数不小于0即可 local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0) --- 设置对应值 if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act) end --- 返回列表,{是否允许, 等待时间} return {ok, wait_period}
在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现
// 调用lua脚本 func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) { // ... res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result() if err != nil && err != redis.Nil { return nil, err } //... return &Reservation{ ok: allow == 1, lim: lim, tokens: n, timeToAct: now.Add(time.Duration(wait) * time.Second), }, nil }
运行测试
func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) r, err := NewRedisLimiter(rdb, 1, 2, "testrate") if err != nil { log.Fatal(err) } r.Reset() for i := 0; i < 5; i++ { err := r.Wait(context.TODO()) log.Printf("worker %d allowed: %v", i, err) } } // output // 2022/07/22 12:50:31 worker 0 allowed: <nil> // 2022/07/22 12:50:31 worker 1 allowed: <nil> // 2022/07/22 12:50:32 worker 2 allowed: <nil> // 2022/07/22 12:50:33 worker 3 allowed: <nil> // 2022/07/22 12:50:34 worker 4 allowed: <nil>
前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。
其他
Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。
The above is the detailed content of How to use Redis in Golang distributed applications. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics











On CentOS systems, you can limit the execution time of Lua scripts by modifying Redis configuration files or using Redis commands to prevent malicious scripts from consuming too much resources. Method 1: Modify the Redis configuration file and locate the Redis configuration file: The Redis configuration file is usually located in /etc/redis/redis.conf. Edit configuration file: Open the configuration file using a text editor (such as vi or nano): sudovi/etc/redis/redis.conf Set the Lua script execution time limit: Add or modify the following lines in the configuration file to set the maximum execution time of the Lua script (unit: milliseconds)

Golang is better than C in concurrency, while C is better than Golang in raw speed. 1) Golang achieves efficient concurrency through goroutine and channel, which is suitable for handling a large number of concurrent tasks. 2)C Through compiler optimization and standard library, it provides high performance close to hardware, suitable for applications that require extreme optimization.

In Debian systems, readdir system calls are used to read directory contents. If its performance is not good, try the following optimization strategy: Simplify the number of directory files: Split large directories into multiple small directories as much as possible, reducing the number of items processed per readdir call. Enable directory content caching: build a cache mechanism, update the cache regularly or when directory content changes, and reduce frequent calls to readdir. Memory caches (such as Memcached or Redis) or local caches (such as files or databases) can be considered. Adopt efficient data structure: If you implement directory traversal by yourself, select more efficient data structures (such as hash tables instead of linear search) to store and access directory information

Golang is better than Python in terms of performance and scalability. 1) Golang's compilation-type characteristics and efficient concurrency model make it perform well in high concurrency scenarios. 2) Python, as an interpreted language, executes slowly, but can optimize performance through tools such as Cython.

Goimpactsdevelopmentpositivelythroughspeed,efficiency,andsimplicity.1)Speed:Gocompilesquicklyandrunsefficiently,idealforlargeprojects.2)Efficiency:Itscomprehensivestandardlibraryreducesexternaldependencies,enhancingdevelopmentefficiency.3)Simplicity:

Golang and Python each have their own advantages: Golang is suitable for high performance and concurrent programming, while Python is suitable for data science and web development. Golang is known for its concurrency model and efficient performance, while Python is known for its concise syntax and rich library ecosystem.

How does the Redis caching solution realize the requirements of product ranking list? During the development process, we often need to deal with the requirements of rankings, such as displaying a...

Enable Redis slow query logs on CentOS system to improve performance diagnostic efficiency. The following steps will guide you through the configuration: Step 1: Locate and edit the Redis configuration file First, find the Redis configuration file, usually located in /etc/redis/redis.conf. Open the configuration file with the following command: sudovi/etc/redis/redis.conf Step 2: Adjust the slow query log parameters in the configuration file, find and modify the following parameters: #slow query threshold (ms)slowlog-log-slower-than10000#Maximum number of entries for slow query log slowlog-max-len
