68-90缓存冷启动及缓存预热解决方案

缓存冷启动的问题

  1. 新系统第一次上线,此时在缓存里可能是没有数据的

  2. 系统在线上稳定运行着,但是突然间重要的redis缓存全盘崩溃了,而且不幸的是,数据全都无法找回来

缓存冷启动问题

缓存预热

缓存冷启动,redis启动后,一点数据都没有,直接就对外提供服务了,mysql就裸奔

解决方案:

  1. 提前给redis中灌入部分数据,再提供服务
  2. 肯定不可能将所有数据都写入redis,因为数据量太大了,第一耗费的时间太长了,第二根本redis容纳不下所有的数据
  3. 需要根据当天的具体访问情况,实时统计出访问频率较高的热数据
  4. 然后将访问频率较高的热数据写入redis中,肯定是热数据也比较多,我们也得多个服务并行读取数据去写,并行的分布式的缓存预热
  5. 然后将灌入了热数据的redis对外提供服务,这样就不至于冷启动,直接让数据库裸奔了

代码实现思路:

  1. nginx+lua将访问流量上报到kafka中

    要统计出来当前最新的实时的热数据有哪些,我们就得将商品详情页访问的请求对应的流量、日志,实时上报到kafka中

  2. storm从kafka中消费数据,实时统计出每个商品的访问次数,访问次数基于LRU内存数据结构的存储方案

    优先使用内存中的一个LRUMap去存放,优点是性能高而且没有外部依赖
    如果使用redis,要防止redis挂掉,导致数据丢失的情况。如果用mysql,扛不住高并发读写; 用hbase,hadoop生态系统,维护麻烦,太重了

    只要统计出最近一段时间访问最频繁的商品,然后对它们进行访问计数,同时维护出一个前N个访问最多的商品list即可。

    热数据:最近一段时间,比如最近1个小时,最近5分钟,1万个商品请求,统计出最近这段时间内每个商品的访问次数,排序,做出一个排名前N的list

    计算好每个task大致要存放的商品访问次数的数量,计算出大小

    然后构建一个LRUMap,apache commons collections有开源的实现,设定好map的最大大小,就会自动根据LRU算法去剔除多余的数据,保证内存使用限制

    即使有部分数据被干掉了,然后下次来重新开始计数,也没关系,因为如果它被LRU算法干掉,那么它就不是热数据,说明最近一段时间都很少访问了

  3. 每个storm task启动的时候,基于zookeeper分布式锁,将自己的id写入zookeeper同一个节点中

  4. 每个storm task负责完成自己这里的热数据的统计,每隔一段时间,就遍历一下这个map,然后维护一个前N个商品的list,更新这个list

  5. 写一个后台线程,每隔一段时间,比如1分钟,都将排名前N的热数据list,同步到zookeeper中去,存储到这个storm task对应的一个zNode中去

  6. 我们需要一个服务,比如说,代码可以跟缓存数据生产服务放一起,但是也可以放单独的服务

    服务可能部署了很多个实例

    每次服务启动的时候,就会去拿到一个storm task的列表,然后根据taskId,一个一个的去尝试获取taskId对应的zNode的zookeeper分布式锁

    如果能获取到分布式锁的话,那么就将那个storm task对应的热数据的list取出来
    然后将数据从mysql中查询出来,写入缓存中,进行缓存的预热,多个服务实例,分布式的并行的去做,基于zk分布式锁做了协调了,分布式并行缓存的预热


基于nginx+lua完成商品详情页访问流量实时上报kafka的开发

不在分发服务器做,在实际处理请求的机器上做,bigdata01,bigdata02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#使用root用户安装
yum install -y unzip

#以下命令使用bigdata用户

#需要在nginx.conf中,http部分,加入resolver 8.8.8.8;
vi /export/servers/nginx/conf/nginx.conf
...
http {
resolver 8.8.8.8;
include mime.types;
default_type application/octet-stream;
...

#需要在kafka中加入 advertised.host.name = 192.168.33.6* ,重启三个kafka进程
vi /export/servers/kafka/config/server.properties
...
#每台机器根据本机ip修改
advertised.host.name = 192.168.33.61
...


cd /home/bigdata/software
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
chmod 777 /home/bigdata/software/master.zip
unzip /home/bigdata/software/master.zip
cp -rf /home/bigdata/software/lua-resty-kafka-master/lib/resty /home/bigdata/ngnix-test/lualib

#编辑lua脚本
vi /home/bigdata/ngnix-test/lua/product.lua

...
#注意不要重复引入相关lib
local cjson = require("cjson")
local producer = require("resty.kafka.producer")

local broker_list = {
{ host = "192.168.33.61", port = 9092 },
{ host = "192.168.33.62", port = 9092 },
{ host = "192.168.33.63", port = 9092 }
}

-- 将需要的数据放入json数据中
local log_json = {}
log_json["request_module"] = "product_detail_info"
log_json["headers"] = ngx.req.get_headers()
log_json["uri_args"] = ngx.req.get_uri_args()
log_json["body"] = ngx.req.read_body()
log_json["http_version"] = ngx.req.http_version()
log_json["method"] =ngx.req.get_method()
log_json["raw_reader"] = ngx.req.raw_header()
log_json["body_data"] = ngx.req.get_body_data()

-- json转string
local message = cjson.encode(log_json);

local productId = ngx.req.get_uri_args()["productId"]

-- 构建kafka producer
local async_producer = producer:new(broker_list, { producer_type = "async" })
-- 将数据发给kafka
local ok, err = async_producer:send("access-log", productId, message)

if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end


#使用root启动nginx或重新加载nginx
/export/servers/nginx/sbin/nginx
/export/servers/nginx/sbin/nginx -s reload
...

两台机器上都这样做,才能统一上报流量到kafka

#创建access-log topic
kafka-topics.sh --zookeeper 192.168.33.61:2181,192.168.33.62:2181,192.168.33.63:2181 --topic access-log --replication-factor 1 --partitions 1 --create

#使用kafka消费命令消费数据,查看数据产生情况
kafka-console-consumer.sh --zookeeper 192.168.33.61:2181,192.168.33.62:2181,192.168.33.63:2181 --topic access-log --from-beginning

#使用浏览器产生数据

http://192.168.33.61/product?requestPath=product&productId=11&shopId=1

注意事项:

  1. kafka进程挂了,可能是虚拟机的问题,杀掉进程,重新启动一下

  2. 需要启动eshop-cache缓存服务,因为nginx中的本地缓存可能不在了

热启动方案注意点-1

基于storm-kafka完成商品访问次数实时统计拓扑的开发

nignx将访问日志通过kafka发送给storm,storm消费kafka消息

基于storm完成LRUMap中topn热门商品列表的算法讲解与编写

storm消费kafka消息,并统计结果,将结果放入LRUMap中,再写算法取TopN结果

基于storm-zookeeper完成热门商品列表的分段存储

  1. 将storm自己运行的task的 taskId 写入一个 zookeeper node 中,形成 taskId 的列表

    /taskId-list,111,222,333

  2. 然后每次都将自己的热门商品列表,写入自己的 taskId 对应的 zookeeper 节点

    <”/task-hot-product-list-taskId”, 热门商品列表>

  3. 然后这样的话,并行的预热程序才能从第一步中知道,有哪些 taskId

  4. 然后并行预热程序根据每个 taskId 去获取一个锁,然后再从对应的 zNode 中拿到热门商品列表

基于双重zookeeper分布式锁完成分布式并行缓存预热的代码开发

  1. 服务启动的时候,进行缓存预热

  2. 从zk中读取 taskId 列表

  3. 依次遍历每个 taskId,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了

  4. 直接尝试获取下一个 taskId 的分布式锁

  5. 即使获取到了分布式锁,也要检查一下这个 taskId 的预热状态,如果已经被预热过了,就不再预热了

  6. 执行预热操作,遍历 productId 列表,查询数据,然后写ehcache和redis

  7. 预热完成后,设置 taskId 对应的预热状态

将缓存预热解决方案的代码运行后观察效果以及调试和修复所有的bug