简介

guava是google出品的一个功能十分强大的工具包,Cache是其中一个非常有用的功能,多线程并发访问时,只有一个线程负责去获取数据,能防大大量请求直接请求后端的DB或其他资源,减少缓存穿透造成的影响。本篇文章介绍本地缓存Cache的一些实践经验。

缓存过期策略

  • expireAfterAccess: 在指定的过期时间内没有读写,缓存数据即失效
  • expireAfterWrite: 在指定的过期时间内没有写入,缓存数据即失效
  • refreshAfterWrite: 在指定的过期时间之后访问时,刷新缓存数据,在刷新任务未完成之前,其他线程返回旧值

expireAfterAccess和expireAfterWrite比较容易理解,一个是多久没有访问就过期,另一个是多久没有写入就过期,是比较常见的策略。

了解清楚refreshAfterWrite的机制,有助于我们用好Cache,下面通过一个例子来说明

refreshAfterWrite示例

public class RefreshAfterWriteTest {

    private StopWatch stopWatch;

    @Test
    public void test() throws InterruptedException {
        stopWatch = new StopWatch();
        stopWatch.start();
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .refreshAfterWrite(5, TimeUnit.SECONDS).build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer key) throws Exception {
                        return queryData(key);
                    }
                });
        Thread thread1 = startLoadingCacheQuery("client1", cache);
        Thread thread2 = startLoadingCacheQuery("client2", cache);
        thread1.join();
        thread2.join();
        Thread thread3 = startLoadingCacheQuery("client3", cache);
        Thread thread4 = startLoadingCacheQuery("client4", cache);
        thread3.join();
        thread4.join();
        Thread.sleep(10000);
        Thread thread5 = startLoadingCacheQuery("client5", cache);
        Thread thread6 = startLoadingCacheQuery("client6", cache);
        thread5.join();
        thread6.join();
    }

    private String queryData(Integer key) throws InterruptedException {
        log("queryData start");
        Thread.sleep(3000);
        log("queryData end");
        return key.toString();
    }

    private Thread startLoadingCacheQuery(String clientName, LoadingCache<Integer, String> cache) {
        Thread thread = new Thread(() -> {
            log("get start");
            try {
                cache.get(1);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            log("get end");
        });
        thread.setName(clientName);
        thread.start();
        return thread;
    }

    private void log(String msg) {
        System.out.println(String.format("%ds %s %s", stopWatch.getTime() / 1000, Thread.currentThread().getName(), msg));
    }
}

输出结果

0s client2 get start
0s client1 get start
0s client2 queryData start
3s client2 queryData end
3s client1 get end
3s client2 get end
3s client3 get start
3s client3 get end
3s client4 get start
3s client4 get end
13s client5 get start
13s client6 get start
13s client5 queryData start
13s client6 get end
16s client5 queryData end
16s client5 get end

分析

client1和client2并发请求的情况

可以看到,只有client2一个线程去执行真正的数据查询,client1处于阻塞状态,等到查询完成之后,一起返回查询结果

client3和client4并发请求(缓存未过期)的情况

可以看到,client3和client4都命中缓存,直接返回了数据

client5和client6并发请求(缓存已过期)的情况

可以看到,client5去执行真正的数据查询,而client6直接返回了缓存的旧值。refreshAfterWrite和expireAfterWrite的区别就在这里,一个返回旧值,另一个阻塞等待新值

最佳实践

不管使用哪一种缓存过期策略,guava cache都会帮我们确保,同一个key,同时只有一个线程去执行刷新,避免了热点key的大量请求给后端造成的性能压力。但是这样还不够。

expireAfterAccess和expireAfterWrite在缓存过期后,由一个请求去执行后端查询,其他请求都在阻塞等待结果返回,如果同时有大量的请求阻塞,那么可能会产生大影响。

refreshAfterWrite返回旧值的处理方式解决了大量线程阻塞等待的问题,但是返回的旧值可能是已存在于缓存中很长时间的值,对时效性要求高的场景可能会造成非常大的错误。

实践一:expireAfterWrite和refreshAfterWrite结合使用,expire的时间要比refresh的长

这样的组合,既避免了大量线程阻塞等待数据更新的问题,也可以保证数据的有效期在合理的范围内,不会出现过期很久的数据。

expire的时间如果比refresh的短,可以想象,旧值在expire到达的时间就失效移除了,refresh根本就不会触发,因为没有旧值可以返回,所有的线程就都要阻塞等待新值到来。我们可以考虑3:1到5:!这样的时间分配比例,比如说expire时间为5分钟,refresh时间设为1分钟,这是一个经验值。

实践二:使用reload来执行异步刷新数据

按照refreshAfterWrite的功能说明,在数据超过refresh设置的期限后,并发请求中,有一个线程要去执行数据刷新任务,其他线程可以返回旧值。那么,为什么是我去执行刷新,其他兄弟都毫不停留马上就去送外卖了。实际上,刷新数据的任务可以交给后台线程去做,请求线程都可以马上返回。在expireAfterWrite和refreshAfterWrite的组合情况下,可以达到非阻塞的效果,这意味着请求都只需要访问本地缓存,无需IO等待。当然,如果缓存中不存在旧值,这时候是肯定需要等待第一次加载数据的。这样做可以保证热点key的访问一直处于非阻塞状态。

示例

public class ReloadTest {

    private StopWatch stopWatch;

    @Test
    public void test() throws InterruptedException {
        stopWatch = new StopWatch();
        stopWatch.start();
        ExecutorService executor = new ThreadPoolExecutor(8, 32, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .refreshAfterWrite(1, TimeUnit.SECONDS).build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer key) throws Exception {
                        return queryData(key);
                    }

                    @Override
                    public ListenableFuture<String> reload(Integer key, String oldValue) throws Exception {
                        ListenableFutureTask<String> task = ListenableFutureTask.create(() -> load(key + 1));
                        executor.execute(task);
                        return task;
                    }
                });
        Thread thread7 = startLoadingCacheQuery("client7", cache);
        Thread thread8 = startLoadingCacheQuery("client8", cache);
        thread7.join();
        thread8.join();
        Thread.sleep(3000);
        Thread thread9 = startLoadingCacheQuery("client9", cache);
        Thread thread10 = startLoadingCacheQuery("client10", cache);
        thread9.join();
        thread10.join();
        Thread.sleep(5000);
    }

    private String queryData(Integer key) throws InterruptedException {
        log("queryData start");
        Thread.sleep(3000);
        log("queryData end");
        return key.toString();
    }

    private Thread startLoadingCacheQuery(String clientName, LoadingCache<Integer, String> cache) {
        Thread thread = new Thread(() -> {
            log("get start");
            try {
                cache.get(1);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            log("get end");
        });
        thread.setName(clientName);
        thread.start();
        return thread;
    }

    private void log(String msg) {
        System.out.println(String.format("%ds %s %s", stopWatch.getTime() / 1000, Thread.currentThread().getName(), msg));
    }
}

输出结果

0s client7 get start
0s client8 get start
0s client8 queryData start
3s client8 queryData end
3s client7 get end
3s client8 get end
6s client9 get start
6s client10 get start
6s client10 get end
6s pool-1-thread-1 queryData start
6s client9 get end
9s pool-1-thread-1 queryData end

可以看到,client7和client8并发执行时,由于缓存中没有旧值,所以必须要有一个线程去执行数据获取,其他线程也处于阻塞状态。

client9与client10并发访问时,虽然过了refresh的时间,但是他们都优先返回了旧值,由线程池里的线程去执行数据刷新。

实践三:服务启动时预热缓存

从上面的示例可以看到,刚开始的时候,由于缓存中没有任何数据,所有线程都需要阻塞等待初始化。造成的结果就是刚启动的服务,看起来是卡顿的,过了一段时间之后,服务稳定状态的表现就要好得多。

我们可以通过预热缓存来解决这个问题,将可能的热点数据先尝试加载,完成之后,再将服务暴露出去。