缓存冷启动的问题
新系统第一次上线,此时在缓存里可能是没有数据的
系统在线上稳定运行着,但是突然间重要的redis缓存全盘崩溃了,而且不幸的是,数据全都无法找回来
缓存预热
缓存冷启动,redis启动后,一点数据都没有,直接就对外提供服务了,mysql就裸奔
解决方案:
- 提前给redis中灌入部分数据,再提供服务
- 肯定不可能将所有数据都写入redis,因为数据量太大了,第一耗费的时间太长了,第二根本redis容纳不下所有的数据
- 需要根据当天的具体访问情况,实时统计出访问频率较高的热数据
- 然后将访问频率较高的热数据写入redis中,肯定是热数据也比较多,我们也得多个服务并行读取数据去写,并行的分布式的缓存预热
- 然后将灌入了热数据的redis对外提供服务,这样就不至于冷启动,直接让数据库裸奔了
代码实现思路:
nginx+lua将访问流量上报到kafka中
要统计出来当前最新的实时的热数据有哪些,我们就得将商品详情页访问的请求对应的流量、日志,实时上报到kafka中
storm从kafka中消费数据,实时统计出每个商品的访问次数,访问次数基于LRU内存数据结构的存储方案
优先使用内存中的一个LRUMap去存放,优点是性能高而且没有外部依赖
如果使用redis,要防止redis挂掉,导致数据丢失的情况。如果用mysql,扛不住高并发读写; 用hbase,hadoop生态系统,维护麻烦,太重了只要统计出最近一段时间访问最频繁的商品,然后对它们进行访问计数,同时维护出一个前N个访问最多的商品list即可。
热数据:最近一段时间,比如最近1个小时,最近5分钟,1万个商品请求,统计出最近这段时间内每个商品的访问次数,排序,做出一个排名前N的list
计算好每个task大致要存放的商品访问次数的数量,计算出大小
然后构建一个LRUMap,apache commons collections有开源的实现,设定好map的最大大小,就会自动根据LRU算法去剔除多余的数据,保证内存使用限制
即使有部分数据被干掉了,然后下次来重新开始计数,也没关系,因为如果它被LRU算法干掉,那么它就不是热数据,说明最近一段时间都很少访问了
每个storm task启动的时候,基于zookeeper分布式锁,将自己的id写入zookeeper同一个节点中
每个storm task负责完成自己这里的热数据的统计,每隔一段时间,就遍历一下这个map,然后维护一个前N个商品的list,更新这个list
写一个后台线程,每隔一段时间,比如1分钟,都将排名前N的热数据list,同步到zookeeper中去,存储到这个storm task对应的一个zNode中去
我们需要一个服务,比如说,代码可以跟缓存数据生产服务放一起,但是也可以放单独的服务
服务可能部署了很多个实例
每次服务启动的时候,就会去拿到一个storm task的列表,然后根据taskId,一个一个的去尝试获取taskId对应的zNode的zookeeper分布式锁
如果能获取到分布式锁的话,那么就将那个storm task对应的热数据的list取出来
然后将数据从mysql中查询出来,写入缓存中,进行缓存的预热,多个服务实例,分布式的并行的去做,基于zk分布式锁做了协调了,分布式并行缓存的预热
基于nginx+lua完成商品详情页访问流量实时上报kafka的开发
不在分发服务器做,在实际处理请求的机器上做,bigdata01,bigdata021
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81#使用root用户安装
yum install -y unzip
#以下命令使用bigdata用户
#需要在nginx.conf中,http部分,加入resolver 8.8.8.8;
vi /export/servers/nginx/conf/nginx.conf
...
http {
resolver 8.8.8.8;
include mime.types;
default_type application/octet-stream;
...
#需要在kafka中加入 advertised.host.name = 192.168.33.6* ,重启三个kafka进程
vi /export/servers/kafka/config/server.properties
...
#每台机器根据本机ip修改
advertised.host.name = 192.168.33.61
...
cd /home/bigdata/software
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
chmod 777 /home/bigdata/software/master.zip
unzip /home/bigdata/software/master.zip
cp -rf /home/bigdata/software/lua-resty-kafka-master/lib/resty /home/bigdata/ngnix-test/lualib
#编辑lua脚本
vi /home/bigdata/ngnix-test/lua/product.lua
...
#注意不要重复引入相关lib
local cjson = require("cjson")
local producer = require("resty.kafka.producer")
local broker_list = {
{ host = "192.168.33.61", port = 9092 },
{ host = "192.168.33.62", port = 9092 },
{ host = "192.168.33.63", port = 9092 }
}
-- 将需要的数据放入json数据中
local log_json = {}
log_json["request_module"] = "product_detail_info"
log_json["headers"] = ngx.req.get_headers()
log_json["uri_args"] = ngx.req.get_uri_args()
log_json["body"] = ngx.req.read_body()
log_json["http_version"] = ngx.req.http_version()
log_json["method"] =ngx.req.get_method()
log_json["raw_reader"] = ngx.req.raw_header()
log_json["body_data"] = ngx.req.get_body_data()
-- json转string
local message = cjson.encode(log_json);
local productId = ngx.req.get_uri_args()["productId"]
-- 构建kafka producer
local async_producer = producer:new(broker_list, { producer_type = "async" })
-- 将数据发给kafka
local ok, err = async_producer:send("access-log", productId, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
#使用root启动nginx或重新加载nginx
/export/servers/nginx/sbin/nginx
/export/servers/nginx/sbin/nginx -s reload
...
两台机器上都这样做,才能统一上报流量到kafka
#创建access-log topic
kafka-topics.sh --zookeeper 192.168.33.61:2181,192.168.33.62:2181,192.168.33.63:2181 --topic access-log --replication-factor 1 --partitions 1 --create
#使用kafka消费命令消费数据,查看数据产生情况
kafka-console-consumer.sh --zookeeper 192.168.33.61:2181,192.168.33.62:2181,192.168.33.63:2181 --topic access-log --from-beginning
#使用浏览器产生数据
http://192.168.33.61/product?requestPath=product&productId=11&shopId=1
注意事项:
kafka进程挂了,可能是虚拟机的问题,杀掉进程,重新启动一下
需要启动eshop-cache缓存服务,因为nginx中的本地缓存可能不在了
基于storm-kafka完成商品访问次数实时统计拓扑的开发
nignx将访问日志通过kafka发送给storm,storm消费kafka消息
基于storm完成LRUMap中topn热门商品列表的算法讲解与编写
storm消费kafka消息,并统计结果,将结果放入LRUMap中,再写算法取TopN结果
基于storm-zookeeper完成热门商品列表的分段存储
将storm自己运行的task的 taskId 写入一个 zookeeper node 中,形成 taskId 的列表
/taskId-list,111,222,333
然后每次都将自己的热门商品列表,写入自己的 taskId 对应的 zookeeper 节点
<”/task-hot-product-list-taskId”, 热门商品列表>
然后这样的话,并行的预热程序才能从第一步中知道,有哪些 taskId
然后并行预热程序根据每个 taskId 去获取一个锁,然后再从对应的 zNode 中拿到热门商品列表
基于双重zookeeper分布式锁完成分布式并行缓存预热的代码开发
服务启动的时候,进行缓存预热
从zk中读取 taskId 列表
依次遍历每个 taskId,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了
直接尝试获取下一个 taskId 的分布式锁
即使获取到了分布式锁,也要检查一下这个 taskId 的预热状态,如果已经被预热过了,就不再预热了
执行预热操作,遍历 productId 列表,查询数据,然后写ehcache和redis
预热完成后,设置 taskId 对应的预热状态