LRU缓存

LRU简介

LRULeast Recently Used 的缩写,即最不经常用、最长时间未使用,在操作系统中是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录一个页面自上次被访问以来所经历的时间 $t$,当须淘汰一个页面时,选择现有页面中其 $t$ 值最大的,即最久未使用的页面予以淘汰。

这里说的缓存是一种广义的概念,在计算机存储层次结构中,低一层的存储器都可以看做是高一层的缓存。

计算机系统存储层次示意图

缓存可以有效地解决存储器性能与容量的矛盾,但如果缓存算法设计不当,非但不能提高访问速度,反而会使系统变得更慢。

从本质上来说,缓存之所以有效是因为程序和数据的局部性locality)。程序会按固定的顺序执行,数据会存放在连续的内存空间并反复读写。这些特点使得我们可以缓存那些经常用到的数据,从而提高读写速度。

缓存的大小是固定的,它应该只保存最常被访问的那些数据。然而未来不可预知,我们只能从过去的访问序列做预测,于是就有了各种各样的缓存替换策略。

LeetCode题目

题目描述

运用你所掌握的数据结构,设计和实现一个 LRU (最久未使用) 缓存机制。它应该支持以下操作: 获取数据 get 和 写入数据 put

  • 获取数据 get(key):如果关键字 (key) 存在于缓存中,则获取关键字的值(总是正数),否则返回 -1

  • 写入数据 put(key, value)

    • 如果关键字已经存在,则变更其数据值;
    • 如果关键字不存在,则插入该组「关键字/值」;
    • 当缓存容量达到上限时,它应该在写入新数据之前删除最久未使用的数据值,从而为新的数据值留出空间。
  • 进阶:你是否可以在 $O(1)$ 时间复杂度内完成这两种操作?

1
2
3
4
5
6
7
8
9
10
11
LRUCache cache = new LRUCache( 2 /* 缓存容量 */ );

cache.put(1, 1);
cache.put(2, 2);
cache.get(1); // 返回 1
cache.put(3, 3); // 该操作会使得关键字 2 作废
cache.get(2); // 返回 -1 (未找到)
cache.put(4, 4); // 该操作会使得关键字 1 作废
cache.get(1); // 返回 -1 (未找到)
cache.get(3); // 返回 3
cache.get(4); // 返回 4

解题思路

实现本题的两种操作,需要用到一个哈希表和一个双向链表。在Java语言中,LinkedHashMap拥有这种数据结构。而且利用这内置的数据结构很容易就可以实现LRU缓存的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class LRUCache extends LinkedHashMap<Integer, Integer> {

private int capacity;

public LRUCache(int capacity) {
super(capacity, 0.75f, true);
this.capacity = capacity;
}

public int get(int key) {
return super.getOrDefault(key, -1);
}

public void put(int key, int value) {
super.put(key, value);
}

@Override
protected boolean removeEldestEntry(Map.Entry<Integer, Integer> eldest) {
return size() > capacity;
}
}

上面的代码虽然能够通过 LeetCode 的测试,但是如果在面试的时候面试官更想要看到的应该是候选人手动实现 LRU 的功能,而不是直接调用JDK

通过上面的内容可以了解到LRU 缓存机制主要用到了哈希表双向链表,在这个过程中:

  • 哈希表:通过缓存数据的key在 $O(1)$ 的时间内获得Node在双向链表中的位置,结构其实是 Map<Integer, Node>
  • 双向链表:按照被使用的顺序存储键值对(结点),靠近 head 的结点是最近刚刚使用的,而靠近 tail 的结点则是最长时间未使用的。

这样一来,我们需要做的就是:

  • 使用哈希表进行定位,找出缓存的数据在双向链表中的位置;
  • 根据进行的操作 get/put 以及容量 capacity 维护缓存数据在哈希表和双向链表中存储
    • 把最近访问过的数据项移动到链表头部;
    • 在哈希表中 insert/update/delete 数据项。

算法流程:

  1. 创建双向链表的结点Node
  2. 创建双向链表用于维护结点的使用信息
    1. get(key) 操作
      1. 如果 key 不存在,则返回 -1
      2. 如果 key 存在,则 key 对应的结点是刚刚被使用过的结点,通过哈希表定位到该结点在双向链表中的位置,并将其移动到双向链表的头部,最后返回该结点的值。
    2. put(key, value)操作
      1. key == null,使用 key-value 创建一个新的结点 node,添加该结点到双向链表头部 head,并将 key-node 添加到哈希表中。然后判断结点数量是否超出容量,如果超出,则删除双向链表的尾部结点,并删除哈希表中其对应的映射。
      2. key != null,与 get 操作类似,先通过哈希表定位,再将对应结点的值更新为 value,并将该结点移动到双向链表的头部。

哨兵结点:

在双向链表的实现中,使用伪头部 dummy head 和伪尾部 dummy tail 标记界限,这样在添加和删除结点的时候就不需要再检查相邻的结点是否存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class LRUCache {

private static class DLinkedNode {

private int key, value;

private DLinkedNode pre, next;

public DLinkedNode() {}

public DLinkedNode(int key, int value) {
this.key = key;
this.value = value;
}
}

private Map<Integer, DLinkedNode> cache = new HashMap<>();

private int size; // 缓存中实际存储的元素数量

private int capacity; // 缓存容量

private DLinkedNode head, tail;

public LRUCache(int capacity) {
this.size = 0;
this.capacity = capacity;
head = new DLinkedNode();
tail = new DLinkedNode();
head.next = tail;
tail.pre = head;
}

public int get(int key) {
DLinkedNode node = cache.get(key);
if (node == null) return -1;
// 不空, 将获取到的结点放到双向链表头部
moveToHead(node);
return node.value;
}

public void put(int key, int value) {
if (capacity == 0)
return;
DLinkedNode node = cache.get(key);
if (node == null) {
// 创建新结点并插入到双向链表头部, 存入 cache 中。
DLinkedNode newNode = new DLinkedNode(key, value);
cache.put(key, newNode);
addToHead(newNode);
++size;
if (size > capacity) {
// 删除双向链表尾部结点 同时清除 cache 中的记录。
DLinkedNode tailNode = removeTail();
cache.remove(tailNode.key);
--size;
}
} else {
// 已经存在 key , 修改 value, 移动到头部。
node.value = value;
moveToHead(node);
}
}

private void moveToHead(DLinkedNode node) {
removeNode(node); // 1. 在原链表中移除当前位置的结点, 否则只是移动到头部的话会产生重复。
addToHead(node); // 2. 移动到头部。
}

private void removeNode(DLinkedNode node) {
node.pre.next = node.next;
node.next.pre = node.pre;
}

private void addToHead(DLinkedNode node) {
// 又是经典的双向链表插入结点问题了
node.pre = head;
node.next = head.next;
head.next.pre = node;
head.next = node;
}

private DLinkedNode removeTail() {
DLinkedNode tailNode = tail.pre;
removeNode(tailNode);
return tailNode;
}
}
  • 时间复杂度:get()、put()都是 $O(1)$。
  • 空间复杂度:$O(capacity)$,哈希表和双向链表最多存储 capacity + 1 个元素。

上面的代码应该可以让面试官比较满意了,但是如果想更加突出自己的过人之处还该考虑点什么?

  • 哨兵 (head、tail) 已经考虑;

  • 线程安全 (synchronized 可行考虑);

  • 吞吐量 (CAS操作) 可以再提高一点吗?

  • 考虑使用(ConcurrentHashMap) 分段锁;

  • 如果容忍一定的准确性损失 (ReadWriteLock);

  • 环形 LRU(Disruptor Ring Buffer)

  • 提前分配空间的 LRU

  • 分布式的 LRU 应该如何设计?

线程安全并且带有定时任务的LRU缓存

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个基于单向链表的无界无锁线程安全的队列,适合在高并发环境下使用,效率比较高。在使用时,可以把它理解为基本的队列,但是保证了多线程下的安全性。和普通队列一样,它也是按照先进先出(FIFO)的规则对接点进行排序。 另外,队列元素中不可以放置 null 元素。

ConcurrentLinkedQueue 整个继承关系如下图所示:

ConcurrentLinkedQueue继承与实现关系

ConcurrentLinkedQueue 中最主要的两个方法是:offer(value)poll(),分别实现队列的两个重要的操作:入队和出队。(offer(value) 基本等价于 add(value))。

当添加一个元素到队列的时候,它会添加到队列的尾部,当获取一个元素时,它会返回队列头部的元素。

利用 ConcurrentLinkedQueue 先进先出的特性,每当 put/get (缓存被使用)元素的时候,就将元素存放在队列尾部,这样能够保证头部的元素是最近最少使用的。

ReadWriteLock

ReadWriteLock 是一个接口,位于java.util.concurrent.locks包下,里面只有两个方法分别返回读锁和写锁:

1
2
3
4
5
6
7
8
9
10
11
public interface ReadWriteLock {
/**
* @return the lock used for reading
*/
Lock readLock();

/**
* @return the lock used for writing
*/
Lock writeLock();
}

ReentrantReadWriteLockReadWriteLock 接口的具体实现类。

读写锁比较适合缓存这种读多写少的场景。读写锁可以保证多个线程同时读取,但是只有一个线程可以写入。但是当读锁被线程持有的时候,读锁是无法被其他线程申请,会处于阻塞状态,直至读锁被释放。该锁也是可重入锁。

另外,同一个线程持有写锁时可以继续申请读锁,但是持有读锁的时候不可以申请写锁。

ScheduledExecutorService

ScheduledExecutorService 是一个接口,ScheduledThreadPoolExecutor 是其主要实现类。

ScheduledExecutorService继承与实现关系

ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目用到的比较少,因为有其他方案选择比如 quartz。但是,在一些需求比较简单的场景下还是非常有用的!

ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueuePriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行,如果执行所需时间相同则先提交的任务将被先执行。

线程安全的LRU缓存

  • 实现方法:ConcurrentHashMap + ConcurrentLinkedQueue + ReadWriteLock

这里可能会有疑问的点是:在LeetCode中选择使用双向链表来维护结点之前按使用时间的关系,也可以实现在 $O(1)$ 的时间内完成将结点的移动和删除操作,但是这里用队列来保存结点,如果刚刚访问了队列中间的结点,那这个结点之前或之后的结点该如何处理呢?

这里的做法是:每当 put/get 元素的时候,就将这个元素对应的 key 放到队尾,这样就能保证队列头部的元素就是最近最少使用的,当缓存容量不够的时候,直接移除队列头部的key以及这个key对应的缓存即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class LRUCache<K, V> {

private final int maxCapacity;

private ConcurrentHashMap<K, V> cacheMap;

private ConcurrentLinkedQueue<K> keys;

// 读写锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

private Lock writeLock = readWriteLock.writeLock();

private Lock readLock = readWriteLock.readLock();

public LRUCache(int maxCapacity) {
if (maxCapacity < 0)
throw new IllegalArgumentException("Illegal max capacity" + maxCapacity);
this.maxCapacity = maxCapacity;
cacheMap = new ConcurrentHashMap<>(maxCapacity);
keys = new ConcurrentLinkedQueue<>();
}

public V put(K key, V value) {
// 加写锁
writeLock.lock();
try {
// 1. 判断 key 是否存在于当前缓存
if (cacheMap.containsKey(key)) {
moveToTail(key);
cacheMap.put(key, value);
return value;
}
// 2. 是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
if (cacheMap.size() == maxCapacity) {
System.out.println("reached to maxCapacity");
removeEldestKey();
}
// 3. key 不存在于当前缓存 将 key 添加到队列的尾部并且缓存 key 及其对应的元素
keys.offer(key);
cacheMap.put(key, value);
return value;
} finally {
writeLock.unlock();
}
}

public V get(K key) {
readLock.lock();
try {
// 1. 判断 key 是否存在于缓存
if (cacheMap.containsKey(key)) {
// 2 存在的话将 key 移动到队列尾部
moveToTail(key);
return cacheMap.get(key);
}
return null;
} finally {
readLock.unlock();
}
}

public V remove(K key) {
writeLock.lock();
try {
// 1. 判断 key 是否存在于缓存
if (cacheMap.containsKey(key)) {
keys.remove(key);
return cacheMap.remove(key);
}
return null;
} finally {
writeLock.unlock();
}
}

private void removeEldestKey() {
K eldestKey = keys.poll();
if (eldestKey != null)
cacheMap.remove(eldestKey);
}

/**
* 将元素添加到队列的尾部(put/get的时候执行)
*/
private void moveToTail(K key) {
keys.remove(key);
keys.add(key);
}

private int size() {
return cacheMap.size();
}
}
  • 非并发环境测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NonConcurrentTest  {
public static void main(String[] args) {
LRUCache<Integer, String> lruCache = new LRUCache<>(3);
lruCache.put(1, "Java");
System.out.println(lruCache.get(1)); // Java
lruCache.remove(1);
System.out.println(lruCache.get(1)); // null
lruCache.put(2, "C++");
lruCache.put(3, "Python");
// ["C++", "Python"] 队头在左 队尾在右
System.out.println(lruCache.keys); // [2, 3]
System.out.println(lruCache.get(2)); // C++
// ["Python", "C++"]
lruCache.put(4, "C");
// ["Python", "C++", "C"]
System.out.println(lruCache.keys); // [3, 2, 4]
lruCache.put(5, "PHP");
// ["C++", "C", "PHP"]
System.out.println(lruCache.get(2)); // C++
// ["C", "PHP", "C++"]
System.out.println(lruCache.keys); // [4, 5, 2]
}
}
  • 并发环境测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ConcurrentTest {
/**
* 初始化一个固定容量为 10 的线程池和 count 为 10 的 CountDownLatch。
* 将 100 次操作分 10 次添加到线程池
* 然后等待线程池执行完成 10 次操作(正常情况下会有10个线程同时执行任务,所以速度很快)。
* @param args
*/
public static void main(String[] args) throws InterruptedException {
int threadNum = 10;
int batchSize = 10;
LRUCache<String, Integer> lruCache = new LRUCache<>(batchSize * 10);
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
AtomicInteger atomicInteger = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
for (int t = 0; t < threadNum; t++) {
fixedThreadPool.submit(() -> {
for (int i = 0; i < batchSize; i++) {
int value = atomicInteger.incrementAndGet();
lruCache.put("id:" + value, value);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
fixedThreadPool.shutdown();
System.out.println("Cache size:" + lruCache.size());
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
System.out.println(String.format("Time cost:%dms", duration));
}
}

线程安全并且带有定时任务的LRU缓存

实际上就是在上面实现的LRU的基础上加上一个定时任务去删除缓存,实现定时任务的方式有很多种:

  • Timer :不被推荐,多线程会存在问题。
  • ScheduledExecutorService :定时器线程池,可以用来替代 Timer
  • DelayQueue :延时队列
  • quartz :一个很火的开源任务调度框架,很多其他框架都是基于 quartz 开发的,比如当当网的elastic-job就是基于quartz二次开发之后的分布式调度解决方案
  • $\dots$

因为只是一个小练习,所以选择JDK自带的ScheduledExecutorService,它基于DelayQueue做了很多封装,能够满足大部分需求。

在上面实现的基础上增加一个方法,在put操作的时候,通过这个方法就能直接设置过期时间。

1
2
3
4
5
6
7
private void removeAfterExpire(K key, long expireTime) {
scheduledExecutorService.schedule(() -> {
//过期后清除该键值对
cacheMap.remove(key);
keys.remove(key);
}, expireTime, TimeUnit.MILLISECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public V put(K key, V value, long expireTime) {
// 加写锁
writeLock.lock();
try {
//1.key是否存在于当前缓存
if (cacheMap.containsKey(key)) {
moveToTail(key);
cacheMap.put(key, value);
return value;
}
//2.是否超出缓存容量,超出的话就移除队列头部的元素以及其对应的缓存
if (cacheMap.size() == maxCapacity) {
System.out.println("maxCapacity of cache reached");
removeEldestKey();
}
//3.key不存在于当前缓存。将key添加到队列的尾部并且缓存key及其对应的元素
keys.add(key);
cacheMap.put(key, value);
if (expireTime > 0)
removeAfterExpireTime(key, expireTime);
return value;
} finally {
writeLock.unlock();
}
}

存在的问题:LockConcurrentXxx一起使用显得多余。从此也可以产生两种方案

  • 读写锁加基本的HashMap、双向链表,或者synchronized同步方法。
  • 使用并发工具类ConcurrentXxxx

待改进之处:维护队列 $O(n)$,可以使用其他数据结构减少到 $O(1)$。

Redis 中的 LRU

Redis 作为缓存使用时,一些场景下要考虑内存的空间消耗问题。Redis 会删除过期键以释放空间,过期键的删除策略有两种:

  • 惰性删除:每次从键空间获取键时,都检查键是否过期,如果过期的话就删除该键,否则返回该键。
  • 定期删除:每隔一段时间,程序就对数据库进行一次检查,删除里面的过期键。

当需要从缓存中淘汰数据时,我们希望能淘汰那些将来不可能再被使用的数据,保留那些将来还会频繁访问的数据,但最大的问题是缓存并不能预言未来。一个解决方法就是通过 LRU 进行预测:

  • 最近被频繁访问的数据将来被访问的可能性也越大;
  • 缓存中的数据一般会有这样的访问分布:
    • 一部分数据拥有绝大部分的访问量;
    • 当访问模式很少改变时,可以记录每个数据的最后一次访问时间,拥有最少空闲时间的数据可以被认为将来最有可能被访问到。

LRU配置参数

Redis配置中和LRU有关的有三个:

  • maxmemory:配置 Redis 存储数据时指定限制的内存大小,比如 100m。当缓存消耗的内存超过这个数值时, 将触发数据淘汰。该数据配置为 $0$ 时,表示缓存的数据量没有限制, 即 $LRU$ 功能不生效。$64$ 位的系统默认值为 $0$,$32$ 位的系统默认内存限制为 $3GB$。
  • maxmemory_policy:触发数据淘汰后的淘汰策略。
  • maxmemory_samples:随机采样的精度,也就是随即取出 $key$ 的数目。该数值配置越大, 越接近于真实的 $LRU$ 算法,但是数值越大,相应消耗也变高,对性能有一定影响,样本值默认为 $5$ 。

内存淘汰策略

Redis 中的内存淘汰策略有:

  • volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最久未使用的数据淘汰;
  • volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰;
  • volatile-random:从已设置过期时间的数据集(server.db[i].expires)中随机选取数据淘汰;
  • allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最久未使用的 key
  • allkeys-random:从数据集(server.db[i].dict)中随机选取数据淘汰;
  • no-eviction:禁止驱逐数据,也就是说当内存不足以容纳新写入数据时,新写入操作会报错。

volatile-xxx 这三个淘汰策略使用的不是全量数据,有可能无法淘汰出足够的内存空间。在没有过期键或者没有设置超时属性的键的情况下,这三种策略和 noeviction 差不多。

一般的经验规则:

  • 使用 allkeys-lru 策略:当预期请求符合一个幂次分布(二八法则等),比如一部分的子集元素比其它元素被访问的更多时,可以选择这个策略。
  • 使用 allkeys-random 策略:循环连续的访问所有的键时,或者预期请求分布平均(所有元素被访问的概率都差不多)。
  • 使用 volatile-ttl:要采取这个策略,缓存对象的 TTL 值最好有差异。

volatile-lruvolatile-random 策略,当想要使用单一Redis 实例来同时实现缓存淘汰和持久化一些经常使用的键集合时很有用。

  • 对未设置过期时间的键进行持久化保存,对设置了过期时间的键参与缓存淘汰。
  • 不过一般运行两个实例是解决这个问题的更好方法。

为键设置过期时间也是需要消耗内存的,所以使用 allkeys-lru 这种策略更加节省空间,因为这种策略下可以不为键设置过期时间。

4.0 版本后增加以下两种:

  • volatile-lfu:从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰。
  • allkeys-lfu:当内存不足以容纳新写入数据时,在键空间中,移除最不经常使用的 key

近似LRU算法

$LRU$ 算法需要使用一个双向链表来记录数据的最近被访问顺序,出于节省内存的考虑,$Redis$ 的 $LRU$ 算法并未完整实现。

$Redis$ 并不会选择最久未被访问的键进行回收,而是尝试运行一个近似 $LRU$ 算法,通过对少量键进行取样,然后回收其中的最久未被访问的键。通过调整每次回收时的采样数量 maxmemory-samples,可以实现调整算法的精度。

根据 Redis 作者的说法,每个 Redis Object 可以挤出 $24 bits$ 的空间,但 $24 bits$ 是不够存储两个指针的,而够存储一个低位时间戳,Redis Object 以秒为单位存储了对象新建或者更新时的 Unix Time,也就是 LRU Clock,$24 bits$ 数据要溢出的话需要 $194$ 天,而缓存的数据更新非常频繁,已经足够了。

Redis 的键空间是放在一个哈希表中的,要从所有的键中选出一个最久未被访问的键,需要另外一个数据结构存储这些源信息。最初,Redis只是随机地选 $3$ 个 $key$,然后从中淘汰,后来算法改进到了Nkey 的策略,默认是 $5$ 个。

Redis 3.0 之后又改善了算法的性能,提供了一个待淘汰候选 $key$ 的pool,里面默认有 $16$ 个 $key$,按照空闲时间排好序。更新时从 Redis 键空间随机选择 $N$ 个 $key$,分别计算它们的空闲时间 idle,$key$ 只会在 pool 不满或者空闲时间大于 pool 里最小的时,才会进入 pool,然后从 pool 中选择空闲时间最大的 $key$ 淘汰掉。

扩展阅读

评论