异步队列主要完成了异步的事件,比如a关注b,那么系统会给b发一份站内信,则发站内信的事件就可以作为异步的,也就是说a关注b后就返回结果,而发站内信会不会立刻完成的。
这个生产者消费者模型非常相似,就是说a生产事件,b这边只有在a生产事件后才会去取,而不是一直的等着a的生产。
再比如发系统通知就是异步事件,每个用户不会只等着系统通知,而是做其他事情,留下一个线程监视着系统的通知。
异步事件还可以完成更高级的功能,比如优先级,就拿打印来说,一个用户不停地提交打印任务后,那么其他用户一直得不到打印机会,因此可以设置一个用户再次提交任务,优先级就回降低,对队列实现排序(priority queue)就可以完成带有优先级的打印了。
这里异步队列使用redis实现存储,同时我们也可以使用juc中blocking queue实现。
https://zhuanlan.zhihu.com/p/21649950
消息队列
消息队列是可以实现时间的异步功能,可以是
原理
生产消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
http://www.infoq.com/cn/articles/producers-and-consumers-mode/
订阅发布者也类似于生产消费者,也是有放有拿的问题
区别在于订阅发布者是中订阅者也能会使发布者,两者的关系不是很绝对。
那么到底什么是观察者模式(订阅发布者)呢. 先看看生活中的观察者模式。
好莱坞有句名言. “不要给我打电话, 我会给你打电话”. 这句话就解释了一个观察者模式的来龙去脉。 其中“我”是发布者, “你”是订阅者。
再举个例子,我来公司面试的时候,完事之后每个面试官都会对我说:“请留下你的联系方式, 有消息我们会通知你”。 在这里“我”是订阅者, 面试官是发布者。所以我不用每天或者每小时都去询问面试结果, 通讯的主动权掌握在了面试官手上。而我只需要提供一个联系方式。
http://www.cnblogs.com/lori/p/5413369.html
实现
生产者消费者
上图中抽象了生产者和消费者
因为消息队列中存放的事件,因此需要记录事件的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class EventModel { //事件类型代码 private EventType type; //发出事件的id private int actorId; //这个事件的实体,比如是评论还是问题 private int entityType; private int entityId; //事件实体的拥有者,比如拥有这条评论的人 private int entityOwnerId; //事件的其他存储信息,比如可以存放优先级、时间等 private Map<String, String> exts = new HashMap<String, String>(); //set、get方法 public EventModel setType(EventType type) { this.type = type; return this; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public int getActorId() { return actorId; } //... //set、get方法全部是返回的是事件本身,这样就可以进行链式调用 public static void main(String[] args) { EventModel model = new EventModel(); model.setActorId(12).setEntityId(45).setEntityType(23); } }
|
不同的类都可能是消费者,因此要设计成一个处理接口,然后不同的消费类都实现这个接口,然后调用消费者
1 2 3 4 5 6 7
| public interface EventHandler { //具体的业务处理 void doHandle(EventModel model); //支持的处理对象,比如对于私信来说有可能是邮件处理也可能是站内信处理,所以设置为list List<EventType> getSupportEventTypes(); }
|
不同的事件处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class LikeHandler implements EventHandler { @Override public void doHandle(EventModel model) { } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.LIKE); } } @Component public class FollowHandler implements EventHandler { @Override public void doHandle(EventModel model) { } @Override public List<EventType> getSupportEventTypes() { return Arrays.asList(EventType.FOLLOW); } }
|
redis消息列队
对于生产者和消费者使用redis构造队列
redis可以构造队列和栈等
L开头的是从队头或者说是左边进行操作,R表示从队尾或者右边进行操作
如果想构造队列,那么取和放的操作就要在不同的方向,如果想构造栈,方向就要相同
B开头是带有阻塞功能的,后面的参数可以设置超时时间,如果超时其他客户端就回来操作
超时参数设为 0 表示阻塞时间可以无限期延长(block indefinitely)
BLPOP key1 [key2 ] timeout
取出并获取列表中的第一个元素,或阻塞,直到有可用
BRPOP key1 [key2 ] timeout
取出并获取列表中的最后一个元素,或阻塞,直到有可用
实现
使用lpush实现生产者放队列的功能,因为redis存放对象需要持久化,因此使用json来持久化对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Service public class EventProducer { @Autowired JedisAdapter jedisAdapter; public boolean fireEvent(EventModel eventModel) { try { //持久化对象 String json = JSONObject.toJSONString(eventModel); String key = RedisKeyUtil.getEventQueueKey(); //放入队列 jedisAdapter.lpush(key, json); return true; } catch (Exception e) { return false; } } }
|
消费者是开启一个线程,然后使用brpop从队列中取事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Service public class EventConsumer implements InitializingBean, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); //存放着消费者所有能处理的handler private Map<EventType, List<EventHandler>> config = new HashMap<EventType, List<EventHandler>>(); //获得初始化的上下文 private ApplicationContext applicationContext; @Autowired JedisAdapter jedisAdapter; @Override public void afterPropertiesSet() throws Exception { //启动项目的时候用applicationContext获得所有实现EventHandler的类 Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class); if (beans != null) { //初始化能处理的handler - config对象 for (Map.Entry<String, EventHandler> entry : beans.entrySet()) { List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); for (EventType type : eventTypes) { if (!config.containsKey(type)) { config.put(type, new ArrayList<EventHandler>()); } config.get(type).add(entry.getValue()); } } } //开启一个线程不同的读取队列中的事件 Thread thread = new Thread(new Runnable() { @Override public void run() { while(true) { String key = RedisKeyUtil.getEventQueueKey(); //从队伍里面取出被持久化的事件,timeout为0时 List<String> events = jedisAdapter.brpop(0, key); for (String message : events) { //跳过自己的key if (message.equals(key)) { continue; } //还原事件 EventModel eventModel = JSON.parseObject(message, EventModel.class); if (!config.containsKey(eventModel.getType())) { logger.error("不能识别的事件"); continue; } //执行事件 for (EventHandler handler : config.get(eventModel.getType())) { handler.doHandle(eventModel); } } } } }); thread.start(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
|
优先级
上面的队列只是一个普通的队列,还可以设置带有优先级的队列
有两种方案
设置两个队列,一个等级高,一个等级低,一个高优先级一个低优先级的队列。
高优先级任务放到高队列中,低的放在低优先队列中。redis可以要求队列消费者从哪个队列里面先读。
顺序就是命令的顺序
1
| BRPOP queue.task.1 queue.task.2 0
|
假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别上面的方案就不合适了。
虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。
有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。
例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。
因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。
例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。
可靠性
一般来讲,设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。
之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,我们必须要维护消费关系,可以利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。
缺点
1)redis崩溃的时候队列功能失效
2)如果入队端一直在塞数据,而出队端没有消费数据,或者是入队的频率大而多,出队端的消费频率慢会导致内存暴涨
4)假如有多个消费者同时监听一个队列,其中一个出队了一个元素,另一个则获取不到该元素
5)Redis的队列应用场景是一对多或者一对一的关系,即有多个入队端,但是只有一个消费端(出队)
java中的消息队列
BlockingQueue
在java中可以使用BlockingQueue来实现消息队列,但是效率不是非常高,因为其内部不是无锁方式。
Disruptor
Disruptor可以实现高性能生产者和消费者模式
Diruptor 页面:https://github.com/LMAX-Exchange/disruptor
待完成
https://zhuanlan.zhihu.com/p/21355046
https://www.bittiger.io/classpage/QHkP5QobvhNWGZv9f
http://coolshell.cn/articles/9169.html