千萬級資料並行如何處理?進入學習
推薦學習:
眾所周知,快取最主要的目的就是加速存取,緩解資料庫壓力。最常用的快取就是分散式快取,比如redis,在面對大部分並行場景或者一些中小型公司流量沒有那麼高的情況,使用redis基本都能解決了。但是在流量較高的情況下可能得使用到本地快取了,比如guava的LoadingCache和快手開源的ReloadableCache。
這部分會介紹redis,比如guava的LoadingCache和快手開源的ReloadableCache的使用場景和侷限,通過這一部分的介紹就能知道在怎樣的業務場景下應該使用哪種快取,以及為什麼。
如果寬泛的說redis何時使用,那麼自然就是使用者存取量過高的地方使用,從而加速存取,並且緩解資料庫壓力。如果細分的話,還得分為單節點問題和非單節點問題。
如果一個頁面使用者存取量比較高,但是存取的不是同一個資源。比如使用者詳情頁,存取量比較高,但是每個使用者的資料都是不一樣的,這種情況顯然只能用分散式快取了,如果使用redis,key為使用者唯一鍵,value則是使用者資訊。
redis導致的快取擊穿。
但是需要注意一點,一定要設定過期時間,而且不能設定到同一時間點過期。舉個例子,比如使用者又個活動頁,活動頁能看到使用者活動期間獲獎資料,粗心的人可能會設定使用者資料的過期時間點為活動結束,這樣會
單(熱)點問題
單節點問題說的是redis的單個節點的並行問題,因為對於相同的key會落到redis叢集的同一個節點上,那麼如果對這個key的存取量過高,那麼這個redis節點就存在並行隱患,這個key就稱為熱key。
如果所有使用者存取的都是同一個資源,比如小愛同學app首頁對所有使用者展示的內容都一樣(初期),伺服器端給h5返回的是同一個大json,顯然得使用到快取。首先我們考慮下用redis是否可行,由於redis存在單點問題,如果流量過大的話,那麼所有使用者的請求到達redis的同一個節點,需要評估該節點能否抗住這麼大流量。我們的規則是,如果單節點qps達到了千級別就要解決單點問題了(即使redis號稱能抗住十萬級別的qps),最常見的做法就是使用本地快取。顯然小愛app首頁流量不過百,使用redis是沒問題的。
對於這上面說的熱key問題,我們最直接的做法就是使用本地快取,比如你最熟悉的guava的LoadingCache,但是使用本地快取要求能夠接受一定的髒資料,因為如果你更新了首頁,本地快取是不會更新的,它只會根據一定的過期策略來重新載入快取,不過在我們這個場景是完全沒問題的,因為一旦在後臺推播了首頁後就不會再去改變了。即使改變了也沒問題,可以設定寫過期為半小時,超過半小時重新載入快取,這種短時間內的髒資料我們是可以接受的。
LoadingCache導致的快取擊穿
雖然說本地快取和機器上強相關的,雖然程式碼層面寫的是半小時過期,但由於每臺機器的啟動時間不同,導致快取的載入時間不同,過期時間也就不同,也就不會所有機器上的請求在同一時間快取失效後都去請求資料庫。但是對於單一一臺機器也是會導致快取穿透的,假如有10臺機器,每臺1000的qps,只要有一臺快取過期就可能導致這1000個請求同時打到了資料庫。這種問題其實比較好解決,但是容易被忽略,也就是在設定LoadingCache的時候使用LoadingCache的load-miss方法,而不是直接判斷cache.getIfPresent()== null然後去請求db;前者會加虛擬機器器層面的鎖,保證只有一個請求打到資料庫去,從而完美的解決了這個問題。
但是,如果對於實時性要求較高的情況,比如有段時間要經常做活動,我要保證活動頁面能近實時更新,也就是運營在後臺設定好了活動資訊後,需要在C端近實時展示這次設定的活動資訊,此時使用LoadingCache肯定就不能滿足了。
對於上面說的LoadingCache不能解決的實時問題,可以考慮使用ReloadableCache,這是快手開源的一個本地快取框架,最大的特點是支援多機器同時更新快取,假設我們修改了首頁資訊,然後請求打到的是A機器,這個時候重新載入ReloadableCache,然後它會發出通知,監聽了同一zk節點的其他機器收到通知後重新更新快取。使用這個快取一般的要求是將全量資料載入到本地快取,所以如果資料量過大肯定會對gc造成壓力,這種情況就不能使用了。由於小愛同學首頁這個首頁是帶有狀態的,一般online狀態的就那麼兩個,所以完全可以使用ReloadableCache來只裝載online狀態的首頁。
到這裡三種快取基本都介紹完了,做個小結:
不管哪種本地快取雖然都帶有虛擬機器器層面的加鎖來解決擊穿問題,但是意外總有可能以你意想不到的方式發生,保險起見你可以使用兩級快取的方式即本地快取+redis+db。
這裡redis的使用就不再多說了,相信很多人對api的使用比我還熟悉
這個是guava提供的網上一抓一大把,但是給兩點注意事項
V get(K key, Callable<? extends V> loader)
;要麼使用build的時候使用的是build(CacheLoader<? super K1, V1> loader)
這個時候可以直接使用get()了。此外建議使用load-miss,而不是getIfPresent==null的時候再去查資料庫,這可能導致快取擊穿;LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(1000L)
.expireAfterAccess(Duration.ofHours(1L)) // 多久不存取就過期
.expireAfterWrite(Duration.ofHours(1L)) // 多久這個key沒修改就過期
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
// 資料裝載方式,一般就是loadDB
return key + " world";
}
});
String value = cache.get("hello"); // 返回hello world
登入後複製
匯入三方依賴
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>zknotify-cache</artifactId>
<version>0.1.22</version>
</dependency>
登入後複製
需要看檔案,不然無法使用,有興趣自己寫一個也行的。
public interface ReloadableCache<T> extends Supplier<T> {
/**
* 獲取快取資料
*/
@Override
T get();
/**
* 通知全域性快取更新
* 注意:如果本地快取沒有初始化,本方法並不會初始化本地快取並重新載入
*
* 如果需要初始化本地快取,請先呼叫 {@link ReloadableCache#get()}
*/
void reload();
/**
* 更新本地快取的本地副本
* 注意:如果本地快取沒有初始化,本方法並不會初始化並重新整理原生的快取
*
* 如果需要初始化本地快取,請先呼叫 {@link ReloadableCache#get()}
*/
void reloadLocal();
}
登入後複製
這三個真的是亙古不變的問題,如果流量大確實需要考慮。
簡單說就是快取失效,導致大量請求同一時間打到了資料庫。對於快取擊穿問題上面已經給出了很多解決方案了。
1.2和都說過,主要來看3。假如業務願意只能使用redis而無法使用本地快取,比如資料量過大,實時性要求比較高。那麼當快取失效的時候就得想辦法保證只有少量的請求打到資料庫。很自然的就想到了使用分散式鎖,理論上說是可行的,但實際上存在隱患。我們的分散式鎖相信很多人都是使用redis+lua的方式實現的,並且在while中進行了輪訓,這樣請求量大,資料多的話會導致無形中讓redis成了隱患,並且佔了太多業務執行緒,其實僅僅是引入了分散式鎖就加大了複雜度,我們的原則就是能不用就不用。
那麼我們是不是可以設計一個類似分散式鎖,但是更可靠的rpc服務呢?當呼叫get方法的時候這個rpc服務保證相同的key打到同一個節點,並且使用synchronized來進行加鎖,之後完成資料的載入。在快手提供了一個叫cacheSetter的框架。下面提供一個簡易版,自己寫也很容易實現。
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
/**
* @Description 分散式載入快取的rpc服務,如果部署了多臺機器那麼呼叫端最好使用id做一致性hash保證相同id的請求打到同一臺機器。
**/
public abstract class AbstractCacheSetterService implements CacheSetterService {
private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>();
private final Object lock = new Object();
@Override
public void load(Collection<String> needLoadIds) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return;
}
CountDownLatch latch;
Collection<CountDownLatch> loadingLatchList;
synchronized (lock) {
loadingLatchList = excludeLoadingIds(needLoadIds);
needLoadIds = Collections.unmodifiableCollection(needLoadIds);
latch = saveLatch(needLoadIds);
}
System.out.println("needLoadIds:" + needLoadIds);
try {
if (CollectionUtils.isNotEmpty(needLoadIds)) {
loadCache(needLoadIds);
}
} finally {
release(needLoadIds, latch);
block(loadingLatchList);
}
}
/**
* 加鎖
* @param loadingLatchList 需要加鎖的id對應的CountDownLatch
*/
protected void block(Collection<CountDownLatch> loadingLatchList) {
if (CollectionUtils.isEmpty(loadingLatchList)) {
return;
}
System.out.println("block:" + loadingLatchList);
loadingLatchList.forEach(l -> {
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
/**
* 釋放鎖
* @param needLoadIds 需要釋放鎖的id集合
* @param latch 通過該CountDownLatch來釋放鎖
*/
private void release(Collection<String> needLoadIds, CountDownLatch latch) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return;
}
synchronized (lock) {
needLoadIds.forEach(id -> loadCache.remove(id));
}
if (latch != null) {
latch.countDown();
}
}
/**
* 載入快取,比如根據id從db查詢資料,然後設定到redis中
* @param needLoadIds 載入快取的id集合
*/
protected abstract void loadCache(Collection<String> needLoadIds);
/**
* 對需要載入快取的id繫結CountDownLatch,後續相同的id請求來了從map中找到CountDownLatch,並且await,直到該執行緒載入完了快取
* @param needLoadIds 能夠正在去載入快取的id集合
* @return 公用的CountDownLatch
*/
protected CountDownLatch saveLatch(Collection<String> needLoadIds) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return null;
}
CountDownLatch latch = new CountDownLatch(1);
needLoadIds.forEach(loadId -> loadCache.put(loadId, latch));
System.out.println("loadCache:" + loadCache);
return latch;
}
/**
* 哪些id正在載入資料,此時持有相同id的執行緒需要等待
* @param ids 需要載入快取的id集合
* @return 正在載入的id所對應的CountDownLatch集合
*/
private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) {
List<CountDownLatch> loadingLatchList = Lists.newArrayList();
Iterator<String> iterator = ids.iterator();
while (iterator.hasNext()) {
String id = iterator.next();
CountDownLatch latch = loadCache.get(id);
if (latch != null) {
loadingLatchList.add(latch);
iterator.remove();
}
}
System.out.println("loadingLatchList:" + loadingLatchList);
return loadingLatchList;
}
}
登入後複製
業務實現
import java.util.Collection;
public class BizCacheSetterRpcService extends AbstractCacheSetterService {
@Override
protected void loadCache(Collection<String> needLoadIds) {
// 讀取db進行處理
// 設定快取
}
}
登入後複製
簡單來說就是請求的資料在資料庫不存在,導致無效請求打穿資料庫。
解法也很簡單,從db獲取資料的方法(getByKey(K key))一定要給個預設值。
比如我有個獎池,金額上限是1W,使用者完成任務的時候給他發筆錢,並且使用redis記錄下來,並且落表,使用者在任務頁面能實時看到獎池剩餘金額,在任務開始的時候顯然獎池金額是不變的,redis和db裡面都沒有發放金額的記錄,這就導致每次必然都去查db,對於這種情況,從db沒查出來資料應該快取個值0到快取。
就是大量快取集中失效打到了db,當然肯定都是一類的業務快取,歸根到底是程式碼寫的有問題。可以將快取失效的過期時間打散,別讓其集中失效就可以了。
推薦學習:
以上就是高並行技巧之Redis和本地快取使用技巧分享的詳細內容,更多請關注TW511.COM其它相關文章!