为什么要做多级缓存
关于缓存的方案一般就是使用缓存中间件redis做缓存。redis的单机qps是万级别的,一般来说,redis已经够用了,没必要再增加其他缓存。我们做多级缓存实际考虑如下
- redis是公用的,可能会有预料之外的异常,比如部分节点不可用,或者集群宕机,这种情况下,是否能不影响产品功能,能不能只部分依赖redis?
- 多级缓存的本质是调和成本和性能的矛盾,最常见的应用就是CPU的多级缓存,比如寄存器—>L1缓存—>L2缓存—>L3缓存,寄存器的大小一般以位作为单位,常见64位或128位,L级别缓存以MB为单位。相比之下我们所说的内存条单位一般为GB更为便宜。放到实际软件上,一般缓存的层级更多,本质是提高访问速度,降低性能损耗开支,常见的ToC平台一般缓存构成为 浏览器缓存—>nginx本地缓存—>redis缓存—>进程缓存—>数据库缓存。
- 多级缓存是一种通用方案,适配的qps范围更加广,肯定会增加开发成本,但是收益可观。
多级缓存的问题
多层缓存会产生一系列问题,我们不考虑把涉及到频繁更新的数据当做缓存,如果必要则需要考虑数据的一致性要求是否很高,以及对应采取怎样的更新缓存策略最为合适。
- 缓存需要分动静态,动态更新的缓存设置淘汰策略,支持更新,静态缓存永远加载在进程内存中,固定值不变。
- 缓存联动更新,数据库更新要同步刷新redis和caffeine,redis更新要更新caffeine。caffeine未命中时如果redis能查询到值要反写到caffeine,本地caffeine缓存更新要广播到其他服务节点。
- 多级缓存有独立的缓存淘汰策略,最大保证缓存的时效一致性。

实现原理

考虑实现的时候,其实是有两种做法,一种是使用自定义切面+自定义注解的方式进行处理,每次请求对标有注解的方法进行拦截,然后通过spel表达式获取到相应结果进行缓存处理。
这种做法比较麻烦,其实还有更好的做法,就是依然使用spring官方的缓存框架spring cache来实现,spring cache的常用注解(@Cacheable,@CacheEvict)等可以不用改变,通过源码解析不难发现,spring cache的做法其实是使用了一个缓存管理器CacheManager然后组合了Cache来实现的,这里的CacheManger和Cache都是接口,官方给我们默认提供了一部分实现,常见的比如Redis和Caffeine等CacheManager和Cache的实现。
CacheManger就是统一管理各种Cache实现的管理器,比如RedisCacheManger主要就是管理RedisCache这一个默认实现,而Caffeine的CacheManager管理了多个Caffeine的Cache实现(private final Map<String, Cache> cacheMap = new ConcurrentHashMap(16)),包括BoundedLocalLoadingCache和UnboundedLocalLoadeingCache。
自定义实现缓存管理器
我们模仿来做的话就是自己实现一个可自由组合多种缓存包括redis和Caffeine的缓存管理器CaffeineRedisCacheManager,使用这个管理器暴露Cache的自定义扩展实现CaffeineRedisCache。
自定义缓存管理器核心方法:
public class CaffeineRedisCacheManager implements ICaffeineRedisCacheManager {
private Map<String, Cache> cacheMap = new ConcurrentHashMap<>();
private Set<String> cacheNames;
private boolean dynamic;
private CacheConfigProperties cacheConfigProperties;
private RedissonClient redissonClient;
private RedisTemplate<String, Object> redisTemplate;
private CacheLoader<Object, Object> cacheLoader;
private String serverId;
public CaffeineRedisCacheManager(CacheConfigProperties cacheConfigProperties, RedisTemplate<String, Object> redisTemplate, CacheLoader<Object, Object> cacheLoader) {
super();
this.cacheConfigProperties = cacheConfigProperties;
this.redisTemplate = redisTemplate;
this.serverId = cacheConfigProperties.getServerId();
this.dynamic = cacheConfigProperties.isDynamic();
this.cacheNames = cacheConfigProperties.getCacheNames();
this.cacheLoader = cacheLoader;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> com.github.benmanes.caffeine.cache.Cache<K, V> getCaffeineCache(String name) {
return (com.github.benmanes.caffeine.cache.Cache<K, V>) getCache(name);
}
@Override
public org.springframework.cache.Cache getCache(@NotNull String name) {
ReentrantLock lock = new ReentrantLock();
if (lock.tryLock()) {
try {
Cache cache = cacheMap.get(name);
if (Objects.nonNull(cache)) {
return cache;
}
if (!dynamic && !cacheNames.contains(name)) {
return null;
}
cache = createCaffeineRedisCache(name, cacheLoader);
Cache oldCache = cacheMap.putIfAbsent(name, cache);
log.info("创建新的缓存实例,名称为===>{}", name);
return oldCache == null ? cache : oldCache;
} finally {
lock.unlock();
}
} else {
log.info("竞争锁失败");
return null;
}
}
private Cache createCaffeineRedisCache(String name, CacheLoader<Object, Object> cacheLoader) {
return new CaffeineRedisCache(name, redisTemplate, caffeineCache(cacheLoader), redissonClient, cacheConfigProperties);
}
public com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache(CacheLoader<Object, Object> cacheLoader) {
return caffeineCacheBuilder().build(cacheLoader);
}
}
一个是Cache的构造器,另一个是getCache方法,这个方法后续可以扩展多种cache自定义实现,目前来说只组合了Caffeine和Redis。
自定义实现CaffeineRedisCache
然后是自定义Cache的实现CaffeineRedisCache,Cache接口里提供了必要的抽象方法被去实现,比如(lookup,get,put,evict,clear,invalidate)等。
需要对关键方法进行扩展,比如lookup方法的实现为:
@Override
protected Object lookup(@NotNull Object key) {
Object cacheKey = getKey(key);
Object value = getCaffeineValue(key);
if (Objects.nonNull(value)) {
log.debug("从caffeine获取缓存,获取缓存key===>{}", cacheKey);
return value;
}
value = getRedisValue(key);
if (Objects.nonNull(value)) {
log.debug("从redis获取缓存, 缓存key===>{}", cacheKey);
setCaffeineValue(key, value);
}
return value;
}
查找缓存会先从caffeine本地缓存获取,如果获取不到则从Redis缓存获取,如果redis缓存中有但是Caffeine缓存中没有,则将redis缓存的数据存储一份到caffeine缓存当中,如果都没有则返回为空,数据会从数据库读取,并再次存入到缓存中。
putAll方法:
Override
public void putAll(@NonNull Map<?, ?> map) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
@SuppressWarnings("unchecked")
public <K, V> Object execute(@NonNull RedisOperations<K, V> operations) throws DataAccessException {
ValueOperations<String, Object> valueOperations = (ValueOperations<String, Object>) operations
.opsForValue();
map.forEach((k, v) -> {
Object o = toStoreValue(v);
Duration expire = getExpire(o);
setRedisValue(k, o, expire, valueOperations);
setCaffeineValue(k, o);
});
return null;
}
});
push(new ArrayList<>(map.keySet()), CACHE_OPERATE_ENUM.BATCH_EVICT);
}
put相关属于更新方法,会从redis中拿到相关元素后更新本地caffeine缓存,然后通过push方法使用redis的pub/sub广播到其他服务节点,提示其他服务节点同步清空缓存并更新缓存。
pub/sub监听配置:
@Slf4j
@Getter
@RequiredArgsConstructor
public class CacheMessageSubscription implements MessageListener {
private final RedisSerializer<Object> redisSerializer;
private final CaffeineRedisCacheManager caffeineRedisCacheManager;
@Override
public void onMessage(Message message, byte[] pattern) {
CacheMessageTO cacheMessageTO = (CacheMessageTO) redisSerializer.deserialize(message.getBody());
assert cacheMessageTO != null;
log.info("接收到redis pub/sub 消息===>{}", cacheMessageTO);
if (!Objects.equals(cacheMessageTO.getServerId(), caffeineRedisCacheManager.getServerId())) {
log.info("接收到本地缓存更新请求,更新缓存名称==>{},更新操作===>{},更新key===>{}", cacheMessageTO.getCacheName(), cacheMessageTO.getCacheOperate(), cacheMessageTO.getKey());
caffeineRedisCacheManager.clearLocal(cacheMessageTO.getCacheName(), cacheMessageTO.getKey(), cacheMessageTO.getCacheOperate());
}
}
}
这里会判断清理缓存的动作是不是本机发出的,如果是则拒绝重复操作,否则就执行clearLocal方法更新本地caffeine缓存。
clearLocal核心方法:
@SuppressWarnings("unchecked")
public void clearLocal(String cacheName, Object key, CACHE_OPERATE_ENUM operation) {
Cache cache = cacheMap.get(cacheName);
if (cache == null) {
return;
}
CaffeineRedisCache redisCaffeineCache = (CaffeineRedisCache) cache;
if (CACHE_OPERATE_ENUM.BATCH_EVICT.equals(operation)) {
redisCaffeineCache.clearLocalBatch((Iterable<Object>) key);
} else {
redisCaffeineCache.clearLocal(key);
}
}
Redis和Caffeine的缓存淘汰策略
Spring cache的缺点是并不支持动态的指定某些key的缓存过期时间,如果是使用redis做缓存,一般通过注解@Cacheable记录缓存,然后更新缓存通过注解@CacheEvict更新缓存,为了指定redis的缓存过期时间,我这边做了扩展配置,支持根据key分组自定义过期时间(private final Map<String, Duration> expires),这样每个redis的key组是自定义的缓存过期时间,如果不显示设置则默认为无过期时间。
Cafffeine的缓存淘汰策略支持的更多,比如:
expire-after-access: 访问一段时间后过期策略
expire-after-write: 写入一段时间后过期策略
refresh-after-write: 写入一段时间后自动刷新缓存策略
expire-after-access如果一直访问,缓存的时效也会刷新。expire-after-write也是一样如果一直写入缓存时效也会刷新。
除了上面几个常用的缓存淘汰策略之外,caffeine还支持key和value的引用强弱,比如
weak key和value,在jvm垃圾回收时会直接回收caffeine的缓存
soft 只支持value,只有在jvm内存不够用的时候会回收caffeine的缓存
设置核心代码如下:
public CaffeineRedisCache(String name, RedisTemplate<String, Object> redisTemplate, Cache<Object, Object> caffeineCache, RedissonClient redissonClient, CacheConfigProperties cacheConfigProperties) {
super(cacheConfigProperties.isCacheNullValues());
this.name = name;
this.redisTemplate = redisTemplate;
this.caffeineCache = caffeineCache;
this.redissonClient = redissonClient;
this.cachePrefix = cacheConfigProperties.getCachePrefix();
if (CharSequenceUtil.isNotBlank(cachePrefix)) {
this.getKeyPrefix = name + ":" + cachePrefix + ":";
} else {
this.getKeyPrefix = name + ":";
}
this.defaultExpiration = cacheConfigProperties.getRedis().getDefaultExpiration();
this.defaultNullValuesExpiration = cacheConfigProperties.getRedis().getDefaultNullValuesExpiration();
this.expires = cacheConfigProperties.getRedis().getExpires();
this.topic = cacheConfigProperties.getRedis().getTopic();
this.serverId = cacheConfigProperties.getServerId();
}
public Caffeine<Object, Object> caffeineCacheBuilder() {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterAccess(), cacheBuilder::expireAfterAccess);
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterWrite(), cacheBuilder::expireAfterWrite);
doIfPresent(cacheConfigProperties.getCaffeine().getRefreshAfterWrite(), cacheBuilder::refreshAfterWrite);
if (cacheConfigProperties.getCaffeine().getInitialCapacity() > 0) {
cacheBuilder.initialCapacity(cacheConfigProperties.getCaffeine().getInitialCapacity());
}
if (cacheConfigProperties.getCaffeine().getMaximumSize() > 0) {
cacheBuilder.maximumSize(cacheConfigProperties.getCaffeine().getMaximumSize());
}
if (cacheConfigProperties.getCaffeine().getKeyStrength() != null) {
switch (cacheConfigProperties.getCaffeine().getKeyStrength()) {
case WEAK:
cacheBuilder.weakKeys();
break;
case SOFT:
throw new UnsupportedOperationException("caffeine 不支持 key 软引用");
default:
}
}
if (cacheConfigProperties.getCaffeine().getValueStrength() != null) {
switch (cacheConfigProperties.getCaffeine().getValueStrength()) {
case WEAK:
cacheBuilder.weakValues();
break;
case SOFT:
cacheBuilder.softValues();
break;
default:
}
}
cacheBuilder.removalListener((key, value, cause) -> log.debug("缓存失效,key===>{},value===>{},cause===>{}", key, value, cause));
return cacheBuilder;
}
核心代码
为了结合caffeine和redis,我对spring cache的抽象适配器做了自定义扩展。
CaffeineRedisCache主要代码:
@Slf4j
@Getter
public class CaffeineRedisCache extends AbstractValueAdaptingCache implements Cache<Object, Object> {
private final String name;
private final Cache<Object, Object> caffeineCache;
private final RedisTemplate<String, Object> redisTemplate;
private final RedissonClient redissonClient;
private final String cachePrefix;
private final String getKeyPrefix;
private final Duration defaultExpiration;
private final Duration defaultNullValuesExpiration;
private final Map<String, Duration> expires;
private final String topic;
private final String serverId;
private final Map<String, ReentrantLock> keyLockMap = new ConcurrentHashMap<>();
public CaffeineRedisCache(String name, RedisTemplate<String, Object> redisTemplate, Cache<Object, Object> caffeineCache, RedissonClient redissonClient, CacheConfigProperties cacheConfigProperties) {
super(cacheConfigProperties.isCacheNullValues());
this.name = name;
this.redisTemplate = redisTemplate;
this.caffeineCache = caffeineCache;
this.redissonClient = redissonClient;
this.cachePrefix = cacheConfigProperties.getCachePrefix();
if (CharSequenceUtil.isNotBlank(cachePrefix)) {
this.getKeyPrefix = name + ":" + cachePrefix + ":";
} else {
this.getKeyPrefix = name + ":";
}
this.defaultExpiration = cacheConfigProperties.getRedis().getDefaultExpiration();
this.defaultNullValuesExpiration = cacheConfigProperties.getRedis().getDefaultNullValuesExpiration();
this.expires = cacheConfigProperties.getRedis().getExpires();
this.topic = cacheConfigProperties.getRedis().getTopic();
this.serverId = cacheConfigProperties.getServerId();
}
@Override
public @Nullable Object getIfPresent(@NonNull Object key) {
ValueWrapper valueWrapper = get(key);
if (Objects.isNull(valueWrapper)) {
return null;
}
return valueWrapper.get();
}
@Override
public ValueWrapper putIfAbsent(@NotNull Object key, Object value) {
Object preValue = null;
RLock lock = redissonClient.getLock((String) key);
try {
lock.tryLock(1, 3, TimeUnit.SECONDS);
preValue = getRedisValue(key);
if (preValue == null) {
doPut(key, value);
}
} catch (InterruptedException e) {
log.error("获取分布式锁失败,errMsg===>{}", e.getMessage(), e);
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
return toValueWrapper(preValue);
}
protected Object getRedisValue(Object key) {
return redisTemplate.opsForValue().get(getKey(key));
}
@Override
public @Nullable Object get(@NonNull Object key, @NonNull Function<? super Object, ?> function) {
return get(key, (Callable<Object>) () -> function.apply(key));
}
@Override
@SuppressWarnings("unchecked")
public @NonNull Map<@NonNull Object, @NonNull Object> getAllPresent(@NonNull Iterable<@NonNull ?> iterable) {
GetAllContext context = new GetAllContext((Iterable<Object>) iterable);
doGetAll(context);
Map<Object, Object> cachedKeyValues = context.cachedKeyValues;
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
return result;
}
@Override
@SuppressWarnings("unchecked")
public @NonNull Map<Object, Object> getAll(@NonNull Iterable<?> keys, @NonNull Function<Iterable<?>, @NonNull Map<Object, Object>> mappingFunction) {
GetAllContext context = new GetAllContext((Iterable<Object>) keys);
context.saveRedisAbsentKeys = true;
doGetAll(context);
int redisAbsentCount = context.redisAbsentCount;
Map<Object, Object> cachedKeyValues = context.cachedKeyValues;
if (redisAbsentCount == 0) {
// 所有 key 全部命中缓存
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
return result;
}
// 从 mappingFunction 中获取值
Map<?, ?> mappingKeyValues = mappingFunction.apply(context.redisAbsentKeys);
putAll(mappingKeyValues);
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size() + mappingKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
result.putAll(mappingKeyValues);
return result;
}
@Override
public void put(@NonNull Object key, Object value) {
if (!super.isAllowNullValues() && Objects.isNull(value)) {
this.evict(key);
return;
}
doPut(key, value);
}
@Override
public void putAll(@NonNull Map<?, ?> map) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
@SuppressWarnings("unchecked")
public <K, V> Object execute(@NonNull RedisOperations<K, V> operations) throws DataAccessException {
ValueOperations<String, Object> valueOperations = (ValueOperations<String, Object>) operations
.opsForValue();
map.forEach((k, v) -> {
Object o = toStoreValue(v);
Duration expire = getExpire(o);
setRedisValue(k, o, expire, valueOperations);
setCaffeineValue(k, o);
});
return null;
}
});
push(new ArrayList<>(map.keySet()), CACHE_OPERATE_ENUM.BATCH_EVICT);
}
@Override
public void invalidate(@NonNull Object key) {
evict(key);
}
@Override
public void invalidateAll(@NonNull Iterable<@NonNull ?> iterable) {
Collection<?> keysColl = CollUtil.toCollection(iterable);
Collection<Object> redisKeys = CollUtil.trans(keysColl, this::getKey);
redisTemplate.delete(redisKeys.stream().map(Objects::toString).collect(Collectors.toList()));
push(keysColl, CACHE_OPERATE_ENUM.BATCH_EVICT);
caffeineCache.invalidateAll(keysColl);
}
@Override
public void invalidateAll() {
this.clear();
}
@Override
public @NonNegative long estimatedSize() {
return caffeineCache.estimatedSize();
}
@Override
public @NonNull CacheStats stats() {
return caffeineCache.stats();
}
@Override
public @NonNull ConcurrentMap<@NonNull Object, @NonNull Object> asMap() {
return caffeineCache.asMap();
}
@Override
public void cleanUp() {
caffeineCache.cleanUp();
}
@Override
public @NonNull Policy<Object, Object> policy() {
return caffeineCache.policy();
}
@Override
protected Object lookup(@NotNull Object key) {
Object cacheKey = getKey(key);
Object value = getCaffeineValue(key);
if (Objects.nonNull(value)) {
log.debug("从caffeine获取缓存,获取缓存key===>{}", cacheKey);
return value;
}
value = getRedisValue(key);
if (Objects.nonNull(value)) {
log.debug("从redis获取缓存,缓存key===>{}", cacheKey);
setCaffeineValue(key, value);
}
return value;
}
protected Object getCaffeineValue(Object key) {
return caffeineCache.getIfPresent(key);
}
@NotNull
@Override
public Object getNativeCache() {
return this;
}
@SuppressWarnings("unchecked")
@Override
public <T> T get(@NotNull Object key, @NotNull Callable<T> valueLoader) {
Object value = lookup(key);
if (Objects.nonNull(value)) {
return (T) value;
}
ReentrantLock lock = keyLockMap.computeIfAbsent(key.toString(), s -> {
log.info("为={}=创建锁", s);
return new ReentrantLock();
});
try {
lock.lock();
value = lookup(key);
if (Objects.nonNull(value)) {
return (T) value;
}
value = valueLoader.call();
Object storeValue = toStoreValue(value);
put(key, storeValue);
return (T) value;
} catch (Exception e) {
throw new ValueRetrievalException(key, valueLoader, e.getCause());
} finally {
lock.unlock();
}
}
private void doPut(Object key, Object value) {
value = toStoreValue(value);
Duration expire = getExpire(value);
setRedisValue(key, value, expire);
push(key);
setCaffeineValue(key, value);
}
protected void setCaffeineValue(Object key, Object value) {
caffeineCache.put(key, value);
}
protected void push(Object key) {
push(key, CACHE_OPERATE_ENUM.EVICT);
}
protected void push(Object key, CACHE_OPERATE_ENUM operation) {
push(new CacheMessageTO(this.serverId, this.name, operation, String.valueOf(key)));
}
protected void push(CacheMessageTO message) {
redisTemplate.convertAndSend(topic, message);
}
protected Duration getExpire(Object value) {
Duration cacheNameExpire = expires.get(this.name);
if (cacheNameExpire == null) {
cacheNameExpire = defaultExpiration;
}
if ((value == null || value == NullValue.INSTANCE) && this.defaultNullValuesExpiration != null) {
cacheNameExpire = this.defaultNullValuesExpiration;
}
return cacheNameExpire;
}
protected void setRedisValue(Object key, Object value, Duration expire) {
setRedisValue(key, value, expire, redisTemplate.opsForValue());
}
protected void setRedisValue(Object key, Object value, Duration expire,
ValueOperations<String, Object> valueOperations) {
if (!expire.isNegative() && !expire.isZero()) {
valueOperations.set((String) getKey(key), value, expire);
} else {
valueOperations.set((String) getKey(key), value);
}
}
protected Object getKey(Object key) {
return this.getKeyPrefix + key;
}
@Override
public void evict(@NotNull Object key) {
redisTemplate.delete((String) getKey(key));
push(key);
caffeineCache.invalidate(key);
}
@Override
public void clear() {
Set<String> keys = redisTemplate.keys(this.name.concat(":*"));
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
push((Object) null);
caffeineCache.invalidateAll();
}
public void clearLocal(Object key) {
log.info("清除本地缓存,缓存key===>{}", key);
if (Objects.isNull(key)) {
caffeineCache.invalidateAll();
} else {
caffeineCache.invalidate(key);
}
}
public void clearLocalBatch(Iterable<Object> keys) {
log.info("清除本地缓存, 清除keys===>{}", keys);
caffeineCache.invalidateAll(keys);
}
@SuppressWarnings("unchecked")
protected void doGetAll(GetAllContext context) {
context.cachedKeyValues = caffeineCache.getAll(context.allKeys, keyIterable -> {
Collection<Object> caffeineAbsentKeys = CollUtil.toCollection((Iterable<Object>) keyIterable);
Collection<Object> redisKeys = CollUtil.trans(caffeineAbsentKeys, this::getKey);
// 从 redis 批量获取
List<Object> redisValues = redisTemplate.opsForValue().multiGet(redisKeys.stream().map(Object::toString).collect(Collectors.toList()));
Objects.requireNonNull(redisValues);
// 统计 redis 中没有的 key 数量
int redisAbsentCount = 0;
for (Object value : redisValues) {
if (value == null) {
redisAbsentCount++;
}
}
context.redisAbsentCount = redisAbsentCount;
HashMap<Object, Object> result = new HashMap<>(caffeineAbsentKeys.size() - redisAbsentCount, 1);
boolean saveCacheAbsentKeys = context.saveRedisAbsentKeys;
if (saveCacheAbsentKeys) {
// mappingFunction 的参数
context.redisAbsentKeys = new HashSet<>(redisAbsentCount);
}
int index = 0;
for (Object key : caffeineAbsentKeys) {
Object redisValue = redisValues.get(index);
if (redisValue != null) {
result.put(key, redisValue);
} else if (saveCacheAbsentKeys) {
context.redisAbsentKeys.add(key);
}
index++;
}
return result;
});
}
protected static class GetAllContext {
public GetAllContext(Iterable<Object> allKeys) {
this.allKeys = allKeys;
}
protected Iterable<Object> allKeys;
/**
* 是否将redis未查询到的key保存到 {@link #redisAbsentKeys}
*/
protected boolean saveRedisAbsentKeys = false;
/**
* redis中未查询到的key
*/
protected Set<Object> redisAbsentKeys;
/**
* redis中未查询到的key数量
*/
protected int redisAbsentCount;
/**
* caffeine和redis中缓存的键值,未经过{@link #fromStoreValue}转换
*/
protected Map<Object, Object> cachedKeyValues;
}
}
缓存管理器CaffeineRedisCacheManager扩展实现:
@Slf4j
@Data
public class CaffeineRedisCacheManager implements ICaffeineRedisCacheManager {
private Map<String, Cache> cacheMap = new ConcurrentHashMap<>();
private Set<String> cacheNames;
private boolean dynamic;
private CacheConfigProperties cacheConfigProperties;
private RedissonClient redissonClient;
private RedisTemplate<String, Object> redisTemplate;
private CacheLoader<Object, Object> cacheLoader;
private String serverId;
public CaffeineRedisCacheManager(CacheConfigProperties cacheConfigProperties, RedisTemplate<String, Object> redisTemplate, CacheLoader<Object, Object> cacheLoader) {
super();
this.cacheConfigProperties = cacheConfigProperties;
this.redisTemplate = redisTemplate;
this.serverId = cacheConfigProperties.getServerId();
this.dynamic = cacheConfigProperties.isDynamic();
this.cacheNames = cacheConfigProperties.getCacheNames();
this.cacheLoader = cacheLoader;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> com.github.benmanes.caffeine.cache.Cache<K, V> getCaffeineCache(String name) {
return (com.github.benmanes.caffeine.cache.Cache<K, V>) getCache(name);
}
@Override
public org.springframework.cache.Cache getCache(@NotNull String name) {
ReentrantLock lock = new ReentrantLock();
if (lock.tryLock()) {
try {
Cache cache = cacheMap.get(name);
if (Objects.nonNull(cache)) {
return cache;
}
if (!dynamic && !cacheNames.contains(name)) {
return null;
}
cache = createCaffeineRedisCache(name, cacheLoader);
Cache oldCache = cacheMap.putIfAbsent(name, cache);
log.info("创建新的缓存实例,名称为===>{}", name);
return oldCache == null ? cache : oldCache;
} finally {
lock.unlock();
}
} else {
log.info("竞争锁失败");
return null;
}
}
private Cache createCaffeineRedisCache(String name, CacheLoader<Object, Object> cacheLoader) {
return new CaffeineRedisCache(name, redisTemplate, caffeineCache(cacheLoader), redissonClient, cacheConfigProperties);
}
public com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache(CacheLoader<Object, Object> cacheLoader) {
return caffeineCacheBuilder().build(cacheLoader);
}
@NotNull
@Override
public Collection<String> getCacheNames() {
return this.cacheNames;
}
public void clearLocal(String cacheName, Object key) {
clearLocal(cacheName, key, CACHE_OPERATE_ENUM.EVICT);
}
@Override
public void clearAll(String cacheName) {
Cache cache = cacheMap.get(cacheName);
if (cache == null) {
return;
}
CaffeineRedisCache redisCaffeineCache = (CaffeineRedisCache) cache;
redisCaffeineCache.clear();
}
@SuppressWarnings("unchecked")
public void clearLocal(String cacheName, Object key, CACHE_OPERATE_ENUM operation) {
Cache cache = cacheMap.get(cacheName);
if (cache == null) {
return;
}
CaffeineRedisCache redisCaffeineCache = (CaffeineRedisCache) cache;
if (CACHE_OPERATE_ENUM.BATCH_EVICT.equals(operation)) {
redisCaffeineCache.clearLocalBatch((Iterable<Object>) key);
} else {
redisCaffeineCache.clearLocal(key);
}
}
public Caffeine<Object, Object> caffeineCacheBuilder() {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterAccess(), cacheBuilder::expireAfterAccess);
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterWrite(), cacheBuilder::expireAfterWrite);
doIfPresent(cacheConfigProperties.getCaffeine().getRefreshAfterWrite(), cacheBuilder::refreshAfterWrite);
if (cacheConfigProperties.getCaffeine().getInitialCapacity() > 0) {
cacheBuilder.initialCapacity(cacheConfigProperties.getCaffeine().getInitialCapacity());
}
if (cacheConfigProperties.getCaffeine().getMaximumSize() > 0) {
cacheBuilder.maximumSize(cacheConfigProperties.getCaffeine().getMaximumSize());
}
if (cacheConfigProperties.getCaffeine().getKeyStrength() != null) {
switch (cacheConfigProperties.getCaffeine().getKeyStrength()) {
case WEAK:
cacheBuilder.weakKeys();
break;
case SOFT:
throw new UnsupportedOperationException("caffeine 不支持 key 软引用");
default:
}
}
if (cacheConfigProperties.getCaffeine().getValueStrength() != null) {
switch (cacheConfigProperties.getCaffeine().getValueStrength()) {
case WEAK:
cacheBuilder.weakValues();
break;
case SOFT:
cacheBuilder.softValues();
break;
default:
}
}
cacheBuilder.removalListener((key, value, cause) -> log.debug("缓存失效,key===>{},value===>{},cause===>{}", key, value, cause));
return cacheBuilder;
}
protected static void doIfPresent(Duration duration, Consumer<Duration> consumer) {
if (duration != null && !duration.isNegative()) {
consumer.accept(duration);
}
}
}
CaffeineRedisCacheManager作为Spring Cache Manager的新增扩展实现,作为spring唯一bean注入。
最后使用autoconfig自动注入即可。
具体使用是无感的,只需要@EnableCache Spring Cache自带的注解即可.
自动配置类:
@AutoConfiguration(after = RedisAutoConfiguration.class)
@EnableConfigurationProperties(CacheConfigProperties.class)
public class MultiCacheAutoConfiguration {
@Bean("caffeineRedisCacheManager")
@ConditionalOnMissingBean
@ConditionalOnBean(RedisTemplate.class)
public CaffeineRedisCacheManager cacheManager(CacheConfigProperties cacheConfigProperties, RedisTemplate<String, Object> redisTemplate, ObjectProvider<ICaffeineRedisCacheManagerCustomizer> cacheManagerCustomizers, ObjectProvider<IServerIdGenerator> serverIdGenerators, CacheLoader<Object, Object> cacheLoader) {
String serverId = cacheConfigProperties.getServerId();
if (CharSequenceUtil.isBlank(serverId)) {
serverIdGenerators.ifAvailable(s -> cacheConfigProperties.setServerId((String) s.get()));
}
CaffeineRedisCacheManager cacheManager = new CaffeineRedisCacheManager(cacheConfigProperties, redisTemplate, cacheLoader);
cacheManagerCustomizers.orderedStream().forEach(c -> c.customize(cacheManager));
return cacheManager;
}
@Bean
@ConditionalOnMissingBean
public CacheLoader<Object, Object> cacheLoader() {
return new CacheLoader<Object, Object>() {
@Override
public Object load(@NotNull Object key) {
return null;
}
@Override
public Object reload(@NotNull Object key, @NotNull Object oldValue) {
return oldValue;
}
};
}
@Bean
@ConditionalOnMissingBean(name = "cacheMessageListenerContainer")
public RedisMessageListenerContainer cacheMessageListenerContainer(CacheConfigProperties cacheConfigProperties, RedisTemplate<String, Object> redisTemplate, CacheMessageSubscription cacheMessageSubscription) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(Objects.requireNonNull(redisTemplate.getConnectionFactory()));
redisMessageListenerContainer.addMessageListener(cacheMessageSubscription, new ChannelTopic(cacheConfigProperties.getRedis().getTopic()));
return redisMessageListenerContainer;
}
@Bean
@SuppressWarnings("unchecked")
@ConditionalOnMissingBean(name = "cacheMessageSubscription")
public CacheMessageSubscription cacheMessageSubscription(RedisTemplate<String, Object> redisTemplate, CaffeineRedisCacheManager caffeineRedisCacheManager) {
return new CacheMessageSubscription((RedisSerializer<Object>) redisTemplate.getValueSerializer(), caffeineRedisCacheManager);
}
@Bean
@ConditionalOnMissingBean(IServerIdGenerator.class)
public IServerIdGenerator redisSequenceServerIdGenerator(
RedisTemplate<String, Object> redisTemplate,
CacheConfigProperties cacheConfigProperties) {
return new RedisSequenceIServerIdGenerator(redisTemplate, cacheConfigProperties);
}
}
Q.E.D.






Comments | 0 条评论