Redis實現布隆過濾器解析

2022-10-07 06:01:01

布隆過濾器原理介紹

  【1】概念說明

    1)布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進位制向量和一系列隨機對映函數。布隆過濾器可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的演演算法,缺點是有一定的誤識別率和刪除困難。

  【2】設計思想

    1)BF是由一個長度為m位元的位陣列(bit array)與k個雜湊函數(hash function)組成的資料結構。位陣列均初始化為0,所有雜湊函數都可以分別把輸入資料儘量均勻地雜湊。

    2)當要插入一個元素時,將其資料分別輸入k個雜湊函數,產生k個雜湊值。以雜湊值作為位陣列中的下標,將所有k個對應的位元置為1。

    3)當要查詢(即判斷是否存在)一個元素時,同樣將其資料輸入雜湊函數,然後檢查對應的k個位元。如果有任意一個位元為0,表明該元素一定不在集合中。如果所有位元均為1,表明該集合有(較大的)可能性在集合中。為什麼不是一定在集合中呢?因為一個位元被置為1有可能會受到其他元素的影響,這就是所謂「假陽性」(false positive)。相對地,「假陰性」(false negative)在BF中是絕不會出現的。

  【3】圖示

          

  【4】優缺點

    1)優點

      1.不需要儲存資料本身,只用位元表示,因此空間佔用相對於傳統方式有巨大的優勢,並且能夠保密資料;

      2.時間效率也較高,插入和查詢的時間複雜度均為O(k);

      3.雜湊函數之間相互獨立,可以在硬體指令層面平行計算。

 

    2)缺點

      1.存在假陽性的概率,不適用於任何要求100%準確率的情境;

      2.只能插入和查詢元素,不能刪除元素,這與產生假陽性的原因是相同的。我們可以簡單地想到通過計數(即將一個位元擴充套件為計數值)來記錄元素數,但仍然無法保證刪除的元素一定在集合中。(因此只能進行重建

 

guava框架如何實現布隆過濾器

  【1】引入依賴

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>28.0-jre</version>
</dependency>

 

  【2】簡單使用

//布隆過濾器-數位指紋儲存在當前jvm當中
public class LocalBloomFilter {

    private static final BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8),1000000,0.01);
    /**
     * 谷歌guava布隆過濾器
     * @param id
     * @return
     */
    public static boolean match(String id){
        return bloomFilter.mightContain(id);
    }

    public static void put(Long id){
        bloomFilter.put(id+"");
    }
}

 

  【3】原始碼分析(由上面的三個主要方法看起,create方法,mightContain方法,put方法)

    1)create方法深入分析

@VisibleForTesting
static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
    //檢測序列化器
    checkNotNull(funnel);
    //檢測儲存容量
    checkArgument(expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
    //容錯率應該在0-1之前
    checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
    checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
    //檢測策略
    checkNotNull(strategy);

    if (expectedInsertions == 0) {
      expectedInsertions = 1;
    }

    // 這裡numBits即底下LockFreeBitArray位陣列的長度,可以看到計算方式就是外部傳入的期待數和容錯率
    long numBits = optimalNumOfBits(expectedInsertions, fpp);
    int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
    try {
      return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
    } catch (IllegalArgumentException e) {
      throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
    }
}

private BloomFilter(BitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
    //檢測hash函數個數應該在0-255之間
    checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
    checkArgument(numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
    this.bits = checkNotNull(bits);
    this.numHashFunctions = numHashFunctions;
    this.funnel = checkNotNull(funnel);
    this.strategy = checkNotNull(strategy);
}


//計算容量大小
@VisibleForTesting
static long optimalNumOfBits(long n, double p) {
    if (p == 0) {
      p = Double.MIN_VALUE;
    }
    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}

//計算滿足條件時,應進行多少次hash函數
@VisibleForTesting
static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}

 

    2)mightContain方法深入分析

public boolean mightContain(T object) {
    return strategy.mightContain(object, funnel, numHashFunctions, bits);
}

public <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.BitArray bits) {
    long bitSize = bits.bitSize();
    long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
    int hash1 = (int)hash64;
    int hash2 = (int)(hash64 >>> 32);

    for(int i = 1; i <= numHashFunctions; ++i) {
        int combinedHash = hash1 + i * hash2;
        if (combinedHash < 0) {
            combinedHash = ~combinedHash;
        }

        if (!bits.get((long)combinedHash % bitSize)) {
            return false;
        }
    }

    return true;
}

 

    3)put方法深入分析

@CanIgnoreReturnValue
public boolean put(T object) {
    return strategy.put(object, funnel, numHashFunctions, bits);
}

//策略實現填入bits
public <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.BitArray bits) {
    long bitSize = bits.bitSize();
    long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
    int hash1 = (int)hash64;
    int hash2 = (int)(hash64 >>> 32);
    boolean bitsChanged = false;

    for(int i = 1; i <= numHashFunctions; ++i) {
        int combinedHash = hash1 + i * hash2;
        if (combinedHash < 0) {
            combinedHash = ~combinedHash;
        }

        bitsChanged |= bits.set((long)combinedHash % bitSize);
    }

    return bitsChanged;
}

 

採用Redis實現布隆過濾器

  【1】抽出guava框架中部分核心邏輯方法形成工具類

/**
 * 演演算法過程:
 * 1. 首先需要k個hash函數,每個函數可以把key雜湊成為1個整數
 * 2. 初始化時,需要一個長度為n位元的陣列,每個位元位初始化為0
 * 3. 某個key加入集合時,用k個hash函數計算出k個雜湊值,並把陣列中對應的位元位置為1
 * 4. 判斷某個key是否在集合時,用k個hash函數計算出k個雜湊值,並查詢陣列中對應的位元位,如果所有的位元位都是1,認為在集合中。
 * @description: 布隆過濾器,摘錄自Google-guava包
 **/
public class BloomFilterHelper<T> {
    private int numHashFunctions;

    private int bitSize;

    private Funnel<T> funnel;

    public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {
        Preconditions.checkArgument(funnel != null, "funnel不能為空");
        this.funnel = funnel;
        // 計算bit陣列長度
        bitSize = optimalNumOfBits(expectedInsertions, fpp);
        // 計算hash方法執行次數
        numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
    }

    public int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];

        //有點類似於hashmap中採用高32位元與低32位元相與獲得hash值
        long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        //採用對低32進行變更以達到隨機雜湊函數的效果
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }
        return offset;
    }

    /**
     * 計算bit陣列長度
     * Math.log(2) = 0.6931471805599453;(取0.693147來用)
     * (Math.log(2) * Math.log(2)) = 0.48045237;
     * 假設傳入n為1,000,000  , p為0.01;
     * Math.log(0.01) = -4.605170185988091;
     * 則返回值為9,585,071 ,即差不多是預設容量的10倍
     * 
     * 要知道 1MB = 1024KB , 1KB = 1024B ,1B=8bit。
     * 也就是對一百萬資料預計花費的記憶體為1.143MB的記憶體
     */
    private int optimalNumOfBits(long n, double p) {
        if (p == 0) {
            // 設定最小期望長度
            p = Double.MIN_VALUE;
        }
        return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

    /**
     * 計算hash方法執行次數
     */
    private int optimalNumOfHashFunctions(long n, long m) {
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }
}

 

  【2】構建Redis實現布隆過濾器的服務類

public class BloomRedisService {
    private RedisTemplate<String, Object> redisTemplate;
    private BloomFilterHelper bloomFilterHelper;

    public void setBloomFilterHelper(BloomFilterHelper bloomFilterHelper) {
        this.bloomFilterHelper = bloomFilterHelper;
    }
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 根據給定的布隆過濾器新增值
     * 這裡可以考慮LUA指令碼進行優化,減少傳輸次數
     * 如 eval "redis.call('setbit',KEYS[1],ARGV[1],1) redis.call('setbit',KEYS[1],ARGV[2],1) " 1 mybool  243 5143 
     * 但是又需要衡量操作的時間,與如果次數很多導致的傳輸的資料量很大容易阻塞問題
     */
    public <T> void addByBloomFilter(String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            redisTemplate.opsForValue().setBit(key, i, true);
        }
    }

    /**
     * 根據給定的布隆過濾器判斷值是否存在
     */
    public <T> boolean includeByBloomFilter(String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            if (!redisTemplate.opsForValue().getBit(key, i)) {
                return false;
            }
        }
        return true;
    }
}

 

 

  【3】編輯設定類

@Slf4j
@Configuration
public class BloomFilterConfig implements InitializingBean{

    @Autowired
    private PmsProductService productService;

    @Autowired
    private RedisTemplate template;

    @Bean
    public BloomFilterHelper<String> initBloomFilterHelper() {
        return new BloomFilterHelper<>((Funnel<String>) (from, into) -> into.putString(from, Charsets.UTF_8)
                .putString(from, Charsets.UTF_8), 1000000, 0.01);
    }

    // 布隆過濾器bean注入
    @Bean
    public BloomRedisService bloomRedisService(){
        BloomRedisService bloomRedisService = new BloomRedisService();
        bloomRedisService.setBloomFilterHelper(initBloomFilterHelper());
        bloomRedisService.setRedisTemplate(template);
        return bloomRedisService;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        List<Long> list = productService.getAllProductId();
        log.info("載入產品到布隆過濾器當中,size:{}",list.size());
        if(!CollectionUtils.isEmpty(list)){
            list.stream().forEach(item->{
                //LocalBloomFilter.put(item);
                bloomRedisService().addByBloomFilter(RedisKeyPrefixConst.PRODUCT_REDIS_BLOOM_FILTER,item+"");
            });
        }
    }
}

 

  【4】構建布隆過濾器的攔截器

//攔截器,所有需要檢視商品詳情的請求必須先過布隆過濾器
@Slf4j
public class BloomFilterInterceptor implements HandlerInterceptor {

    @Autowired
    private BloomRedisService bloomRedisService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        String currentUrl = request.getRequestURI();
        PathMatcher matcher = new AntPathMatcher();
        //解析出pathvariable
        Map<String, String> pathVariable = matcher.extractUriTemplateVariables("/pms/productInfo/{id}", currentUrl);
        //布隆過濾器儲存在redis中
        if(bloomRedisService.includeByBloomFilter(RedisKeyPrefixConst.PRODUCT_REDIS_BLOOM_FILTER,pathVariable.get("id"))){
            return true;
        }

        /*
         * 不在布隆過濾器當中,直接返回驗證失敗
         * 設定響應頭
         */
        response.setHeader("Content-Type","application/json");
        response.setCharacterEncoding("UTF-8");
        String result = new ObjectMapper().writeValueAsString(CommonResult.validateFailed("產品不存在!"));
        response.getWriter().print(result);
        return false;
    }

}

 

  【5】將攔截器註冊進SpringMVC中

@Configuration
public class IntercepterConfiguration implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        //註冊攔截器
        registry.addInterceptor(authInterceptorHandler())
                .addPathPatterns("/pms/productInfo/**");
    }

    @Bean
    public BloomFilterInterceptor authInterceptorHandler(){
        return new BloomFilterInterceptor();
    }
}