目錄
三種快取的使用場景
Redis的使用場景和限制
LoadingCache的使用場景和限制
ReloadableCache的使用場景和限制
小結
reloadableCache的使用
快取擊穿
快取雪崩
首頁 資料庫 Redis 高並發技巧之Redis和本地快取使用技巧分享

高並發技巧之Redis和本地快取使用技巧分享

Nov 02, 2022 pm 05:35 PM
redis

這篇文章為大家帶來了關於Redis的相關知識,其中主要介紹的是分散式快取和本地快取的使用技巧,包括快取種類介紹,各種的使用場景,以及如何使用,最後再給實戰案例,下面一起來看一下,希望對大家有幫助。

推薦學習:Redis影片教學

#眾所周知,快取最主要的目的是加速訪問,緩解資料庫壓力。最常用的緩存就是分散式緩存,例如redis,在面對大部分並發場景或一些中小型公司流量沒有那麼高的情況,使用redis基本上都能解決了。但在流量較高的情況下可能得使用到本機快取了,例如guava的LoadingCache和快手開源的ReloadableCache。

三種快取的使用場景

這部分會介紹redis,像是guava的LoadingCache和快手開源的ReloadableCache的使用場景和限制,透過這部分的介紹就能知道在怎樣的業務場景下應該使用哪種緩存,以及為什麼。

Redis的使用場景和限制

如果寬泛的說redis何時使用,那麼自然就是用戶訪問量過高的地方使用,從而加速訪問,並且緩解資料庫壓力。如果細分的話,還得分為單節點問題和非單節點問題。

如果一個頁面使用者訪問量比較高,但是訪問的不是同一個資源。例如使用者詳情頁,訪問量比較高,但每個使用者的資料都是不一樣的,這種情況顯然只能用分散式快取了,如果使用redis,key為使用者唯一鍵,value則是使用者資訊。

redis導致的快取擊穿

但是要注意一點,一定要設定過期時間,而且不能設定到同一時間點過期。舉個例子,例如用戶又個活動頁,活動頁能看到用戶活動期間獲獎數據,粗心的人可能會設定用戶數據的過期時間點為活動結束,這樣會

#單(熱)點問題

單節點問題說的是redis的單一節點的並發問題,因為對於相同的key會落到redis集群的同一個節點上,那麼如果對這個key的訪問量過高,那麼這個redis節點就存在並發隱患,這個key就稱為熱key。

如果所有使用者存取的都是同一個資源,例如小愛同學app首頁對所有使用者展示的內容都一樣(初期),服務端給h5返回的是同一個大json,顯然得使用到緩存。首先我們考慮下用redis是否可行,由於redis存在單點問題,如果流量過大的話,那麼所有用戶的請求到達redis的同一個節點,需要評估該節點能否抗住這麼大流量。我們的規則是,如果單節點qps達到了千級就要解決單點問題了(即使redis號稱能抗住十萬級的qps),最常見的做法就是使用本地快取。顯然小愛app首頁流量不過百,使用redis是沒問題的。

LoadingCache的使用場景和限制

對於這上面說的熱key問題,我們最直接的做法就是使用本地緩存,例如你最熟悉的guava的LoadingCache,但是使用本地快取要求能夠接受一定的髒數據,因為如果你更新了首頁,本地快取是不會更新的,它只會根據一定的過期策略來重新加載緩存,不過在我們這個場景是完全沒問題的,因為一旦在後台推送了首頁後就不會再去改變了。即使改變了也沒問題,可以設定寫過期為半小時,超過半小時重新加載緩存,這種短時間內的髒數據我們是可以接受的。

LoadingCache導致的快取擊穿

雖然說本地快取和機器上強相關的,雖然程式碼層面寫的是半小時過期,但由於每台機器的啟動時間不同,導致快取的載入時間不同,過期時間也就不同,也就不會所有機器上的請求在同一時間快取失效後都去請求資料庫。但對於單一一台機器也是會導致快取穿透的,假如有10台機器,每台1000的qps,只要有一台快取過期就可能導致這1000個請求同時打到了資料庫。這個問題其實比較好解決,但是容易被忽略,也就是在設定LoadingCache的時候使用LoadingCache的load-miss方法,而不是直接判斷cache.getIfPresent()== null然後去請求db;前者會加虛擬機層面的鎖,保證只有一個請求打到資料庫去,從而完美的解決了這個問題。

但是,如果對於即時性要求較高的情況,例如有段時間要經常做活動,我要保證活動頁面能近實時更新,也就是運營在後台配置好了活動信息後,需要在C端近即時展示這次配置的活動訊息,此時使用LoadingCache肯定就不能滿足了。

ReloadableCache的使用場景和限制

對於上面說的LoadingCache不能解決的即時問題,可以考慮使用ReloadableCache,這是快手開源的一個本機快取框架,最大的特點是支援多機器同時更新緩存,假設我們修改了首頁信息,然後請求打到的是A機器,這個時候重新加載ReloadableCache,然後它會發出通知,監聽了同一zk節點的其他機器收到通知後重新更新緩存。使用這個緩存一般的要求是將全量資料載入到本地緩存,所以如果資料量過大肯定會對gc造成壓力,這種情況就不能使用了。由於小愛同學首頁這個首頁是帶有狀態的,一般online狀態的就那麼兩個,所以完全可以使用ReloadableCache來只裝載online狀態的首頁。

小結

到這裡三種快取基本上都介紹完了,做個小結:

  • 對於非熱點的數據訪問,例如用戶維度的數據,直接使用redis即可;
  • 對於熱點數據的訪問,如果流量不是很高,無腦使用redis即可;
  • 對於熱點數據,如果允許一定時間內的髒數據,使用LoadingCache即可;
  • 對於熱點數據,如果一致性要求較高,同時數據量不大的情況,使用ReloadableCache即可;
##小技巧

不管哪種本地快取雖然都帶有虛擬機器層面的加鎖來解決擊穿問題,但是意外總有可能以你意想不到的方式發生,保險起見你可以使用兩級緩存的方式即本地緩存redis db 。

快取使用的簡單介紹

這裡redis的使用就不再多說了,相信很多人對api的使用比我還熟悉

LoadingCache的使用

這個是guava提供的網上一抓一大把,但是給兩點注意事項

    要使用load-miss的話, 要么使用
  • V get(K key, Callable loader);要嘛使用build的時候使用的是build(CacheLoader loader)這個時候可以直接使用get( )了。另外建議使用load-miss,而不是getIfPresent==null的時候再去查資料庫,這可能會導致快取擊穿;
  • 使用load-miss是因為這是執行緒安全的,如果快取失效的話,多個執行緒呼叫get的時候只會有一個執行緒去db查詢,其他執行緒需要等待,也就是說這是執行緒安全的。
  • LoadingCache<String, String> cache = CacheBuilder.newBuilder()
                    .maximumSize(1000L)
                    .expireAfterAccess(Duration.ofHours(1L)) // 多久不访问就过期
                    .expireAfterWrite(Duration.ofHours(1L))  // 多久这个key没修改就过期
                    .build(new CacheLoader<String, String>() {
                        @Override
                        public String load(String key) throws Exception {
                            // 数据装载方式,一般就是loadDB
                            return key + " world";
                        }
                    });
    String value = cache.get("hello"); // 返回hello world
    登入後複製

reloadableCache的使用

導入三方依賴

<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>zknotify-cache</artifactId>
  <version>0.1.22</version>
</dependency>
登入後複製

需要看文檔,不然無法使用,有興趣自己寫一個也行的。

public interface ReloadableCache<T> extends Supplier<T> {

    /**
     * 获取缓存数据
     */
    @Override
    T get();

    /**
     * 通知全局缓存更新
     * 注意:如果本地缓存没有初始化,本方法并不会初始化本地缓存并重新加载
     *
     * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
     */
    void reload();

    /**
     * 更新本地缓存的本地副本
     * 注意:如果本地缓存没有初始化,本方法并不会初始化并刷新本地的缓存
     *
     * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
     */
    void reloadLocal();
}
登入後複製

老生常談的快取擊穿/穿透/雪崩問題

這三個真的是亙古不變的問題,如果流量大確實需要考慮。

快取擊穿

簡單說就是快取失效,導致大量請求同一時間打到了資料庫。對於快取擊穿問題上面已經給了許多解決方案了。

    例如使用本機快取
  • 本機快取使用load-miss方法
  • #使用第三方服務來載入快取
1.2和都說過,主要來看3。假如業務願意只能使用redis而無法使用本地緩存,例如資料量過大,即時性需求比較高。那麼當快取失效的時候就得想辦法保證只有少量的請求打到資料庫。很自然的就想到了使用分散式鎖,理論上是可行的,但實際上有隱憂。我們的分散式鎖相信很多人都是使用redis lua的方式實現的,並且在while中進行了輪訓,這樣請求量大,數據多的話會導致無形中讓redis成了隱患,並且佔了太多業務線程,其實只是引入了分散式鎖就加大了複雜度,我們的原則就是能不用就不用。

那我們是不是可以設​​計一個類似分散式鎖定,但更可靠的rpc服務呢?當呼叫get方法的時候這個rpc服務保證相同的key打到同一個節點,並且使用synchronized來進行加鎖,之後完成資料的載入。在快手提供了一個叫cacheSetter的框架。下面提供一個簡易版,自己寫也很容易實現。

import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;

/**
 * @Description 分布式加载缓存的rpc服务,如果部署了多台机器那么调用端最好使用id做一致性hash保证相同id的请求打到同一台机器。
 **/
public abstract class AbstractCacheSetterService implements CacheSetterService {

    private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>();

    private final Object lock = new Object();

    @Override
    public void load(Collection<String> needLoadIds) {
        if (CollectionUtils.isEmpty(needLoadIds)) {
            return;
        }
        CountDownLatch latch;
        Collection<CountDownLatch> loadingLatchList;
        synchronized (lock) {
            loadingLatchList = excludeLoadingIds(needLoadIds);

            needLoadIds = Collections.unmodifiableCollection(needLoadIds);

            latch = saveLatch(needLoadIds);
        }
        System.out.println("needLoadIds:" + needLoadIds);
        try {
            if (CollectionUtils.isNotEmpty(needLoadIds)) {
                loadCache(needLoadIds);
            }
        } finally {
            release(needLoadIds, latch);
            block(loadingLatchList);
        }

    }

    /**
     * 加锁
     * @param loadingLatchList 需要加锁的id对应的CountDownLatch
     */
    protected void block(Collection<CountDownLatch> loadingLatchList) {
        if (CollectionUtils.isEmpty(loadingLatchList)) {
            return;
        }
        System.out.println("block:" + loadingLatchList);
        loadingLatchList.forEach(l -> {
            try {
                l.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 释放锁
     * @param needLoadIds 需要释放锁的id集合
     * @param latch 通过该CountDownLatch来释放锁
     */
    private void release(Collection<String> needLoadIds, CountDownLatch latch) {
        if (CollectionUtils.isEmpty(needLoadIds)) {
            return;
        }
        synchronized (lock) {
            needLoadIds.forEach(id -> loadCache.remove(id));
        }
        if (latch != null) {
            latch.countDown();
        }
    }

    /**
     * 加载缓存,比如根据id从db查询数据,然后设置到redis中
     * @param needLoadIds 加载缓存的id集合
     */
    protected abstract void loadCache(Collection<String> needLoadIds);

    /**
     * 对需要加载缓存的id绑定CountDownLatch,后续相同的id请求来了从map中找到CountDownLatch,并且await,直到该线程加载完了缓存
     * @param needLoadIds 能够正在去加载缓存的id集合
     * @return 公用的CountDownLatch
     */
    protected CountDownLatch saveLatch(Collection<String> needLoadIds) {
        if (CollectionUtils.isEmpty(needLoadIds)) {
            return null;
        }
        CountDownLatch latch = new CountDownLatch(1);
        needLoadIds.forEach(loadId -> loadCache.put(loadId, latch));
        System.out.println("loadCache:" + loadCache);
        return latch;
    }

    /**
     * 哪些id正在加载数据,此时持有相同id的线程需要等待
     * @param ids 需要加载缓存的id集合
     * @return 正在加载的id所对应的CountDownLatch集合
     */
    private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) {
        List<CountDownLatch> loadingLatchList = Lists.newArrayList();
        Iterator<String> iterator = ids.iterator();
        while (iterator.hasNext()) {
            String id = iterator.next();
            CountDownLatch latch = loadCache.get(id);
            if (latch != null) {
                loadingLatchList.add(latch);
                iterator.remove();
            }
        }
        System.out.println("loadingLatchList:" + loadingLatchList);
        return loadingLatchList;
    }
}
登入後複製

業務實作

import java.util.Collection;
public class BizCacheSetterRpcService extends AbstractCacheSetterService {
    @Override
    protected void loadCache(Collection<String> needLoadIds) {
        // 读取db进行处理
   	// 设置缓存
    }
}
登入後複製

快取穿透

#簡單來說就是請求的資料在資料庫中不存在,導致無效請求打穿資料庫。

解法也很簡單,從db取得資料的方法(getByKey(K key))一定要給個預設值。

例如我有個獎金池,金額上限是1W,用戶完成任務的時候給他發筆錢,並且使用redis記錄下來,並且落表,用戶在任務頁面能實時看到獎池剩餘金額,在任務開始的時候顯然獎池金額是不變的,redis和db裡面都沒有發放金額的記錄,這就導致每次必然都去查db,對於這種情況,從db沒查出來數據應該緩存個值0到緩存。

快取雪崩

就是大量快取集中失效打到了db,當然肯定都是一類的業務緩存,歸根到底是程式碼寫的有問題。可以將快取失效的過期時間打散,別讓其集中失效就可以了。

推薦學習:Redis影片教學

以上是高並發技巧之Redis和本地快取使用技巧分享的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

<🎜>:泡泡膠模擬器無窮大 - 如何獲取和使用皇家鑰匙
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
北端:融合系統,解釋
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1664
14
CakePHP 教程
1423
52
Laravel 教程
1319
25
PHP教程
1269
29
C# 教程
1248
24
redis集群模式怎麼搭建 redis集群模式怎麼搭建 Apr 10, 2025 pm 10:15 PM

Redis集群模式通過分片將Redis實例部署到多個服務器,提高可擴展性和可用性。搭建步驟如下:創建奇數個Redis實例,端口不同;創建3個sentinel實例,監控Redis實例並進行故障轉移;配置sentinel配置文件,添加監控Redis實例信息和故障轉移設置;配置Redis實例配置文件,啟用集群模式並指定集群信息文件路徑;創建nodes.conf文件,包含各Redis實例的信息;啟動集群,執行create命令創建集群並指定副本數量;登錄集群執行CLUSTER INFO命令驗證集群狀態;使

redis數據怎麼清空 redis數據怎麼清空 Apr 10, 2025 pm 10:06 PM

如何清空 Redis 數據:使用 FLUSHALL 命令清除所有鍵值。使用 FLUSHDB 命令清除當前選定數據庫的鍵值。使用 SELECT 切換數據庫,再使用 FLUSHDB 清除多個數據庫。使用 DEL 命令刪除特定鍵。使用 redis-cli 工具清空數據。

redis怎麼讀取隊列 redis怎麼讀取隊列 Apr 10, 2025 pm 10:12 PM

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。

centos redis如何配置Lua腳本執行時間 centos redis如何配置Lua腳本執行時間 Apr 14, 2025 pm 02:12 PM

在CentOS系統上,您可以通過修改Redis配置文件或使用Redis命令來限制Lua腳本的執行時間,從而防止惡意腳本佔用過多資源。方法一:修改Redis配置文件定位Redis配置文件:Redis配置文件通常位於/etc/redis/redis.conf。編輯配置文件:使用文本編輯器(例如vi或nano)打開配置文件:sudovi/etc/redis/redis.conf設置Lua腳本執行時間限制:在配置文件中添加或修改以下行,設置Lua腳本的最大執行時間(單位:毫秒)

redis命令行怎麼用 redis命令行怎麼用 Apr 10, 2025 pm 10:18 PM

使用 Redis 命令行工具 (redis-cli) 可通過以下步驟管理和操作 Redis:連接到服務器,指定地址和端口。使用命令名稱和參數向服務器發送命令。使用 HELP 命令查看特定命令的幫助信息。使用 QUIT 命令退出命令行工具。

redis計數器怎麼實現 redis計數器怎麼實現 Apr 10, 2025 pm 10:21 PM

Redis計數器是一種使用Redis鍵值對存儲來實現計數操作的機制,包含以下步驟:創建計數器鍵、增加計數、減少計數、重置計數和獲取計數。 Redis計數器的優勢包括速度快、高並發、持久性和簡單易用。它可用於用戶訪問計數、實時指標跟踪、遊戲分數和排名以及訂單處理計數等場景。

redis過期策略怎麼設置 redis過期策略怎麼設置 Apr 10, 2025 pm 10:03 PM

Redis數據過期策略有兩種:定期刪除:定期掃描刪除過期鍵,可通過 expired-time-cap-remove-count、expired-time-cap-remove-delay 參數設置。惰性刪除:僅在讀取或寫入鍵時檢查刪除過期鍵,可通過 lazyfree-lazy-eviction、lazyfree-lazy-expire、lazyfree-lazy-user-del 參數設置。

如何優化debian readdir的性能 如何優化debian readdir的性能 Apr 13, 2025 am 08:48 AM

在Debian系統中,readdir系統調用用於讀取目錄內容。如果其性能表現不佳,可嘗試以下優化策略:精簡目錄文件數量:盡可能將大型目錄拆分成多個小型目錄,降低每次readdir調用處理的項目數量。啟用目錄內容緩存:構建緩存機制,定期或在目錄內容變更時更新緩存,減少對readdir的頻繁調用。內存緩存(如Memcached或Redis)或本地緩存(如文件或數據庫)均可考慮。採用高效數據結構:如果自行實現目錄遍歷,選擇更高效的數據結構(例如哈希表而非線性搜索)存儲和訪問目錄信

See all articles