在开发中,往往会遇到一些关于延时任务的需求。例如
生成订单30分钟未支付,则自动取消
生成订单60秒后,给用户发短信
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别
定时任务有明确的触发时间,延时任务没有
定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务
下面,我们以判断订单是否超时为例,进行方案分析
方案分析
(1)数据库轮询
在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作
优点:简单易行,支持集群操作
缺点: (1)对服务器内存消耗大
(2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
(3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
(2)JDK的延迟队列
方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
DelayedQueue实现工作流程如下图所示:
其中Poll():获取并移除队列的超时元素,没有则返回空
take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。
实现:定义一个类OrderDelay实现Delayed,代码如下
package com.rjzheng.delay2;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class OrderDelay implements Delayed {
private String orderId;
private long timeout;
OrderDelay(String orderId, long timeout) {
this.orderId = orderId;
this.timeout = timeout + System.nanoTime();
}
public int compareTo(Delayed other) {
if (other == this)
return 0;
OrderDelay t = (OrderDelay) other;
long d = (getDelay(TimeUnit.NANOSECONDS) - t
.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"
# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"
# 移除单个元素
redis> ZREM page_rank google.com
(integer) 1
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时,具体如下图所示
实现一
package com.rjzheng.delay4;
import java.util.Calendar;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
public class AppTest {
private static final String ADDR = "127.0.0.1";
private static final int PORT = 6379;
private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
public static Jedis getJedis() {
return jedisPool.getResource();
}
//生产者,生成5个订单放进去
public void productionDelayMessage(){
for(int i=0;i= score){
String orderId = ((Tuple)items.toArray()[0]).getElement();
jedis.zrem("OrderId", orderId);
System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}
}
}
public static void main(String[] args) {
AppTest appTest =new AppTest();
appTest.productionDelayMessage();
appTest.consumerDelayMessage();
}
}
此时对应输出如下
可以看到,几乎都是3秒之后,消费订单。
然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码ThreadTest
package com.rjzheng.delay4;
import java.util.concurrent.CountDownLatch;
public class ThreadTest {
private static final int threadNum = 10;
private static CountDownLatch cdl = new CountDownLatch(threadNum);
static class DelayMessage implements Runnable{
public void run() {
try {
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
AppTest appTest =new AppTest();
appTest.consumerDelayMessage();
}
}
public static void main(String[] args) {
AppTest appTest =new AppTest();
appTest.productionDelayMessage();
for(int i=0;i= score){
String orderId = ((Tuple)items.toArray()[0]).getElement();
jedis.zrem("OrderId", orderId);
System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}
修改为
if(nowSecond >= score){
String orderId = ((Tuple)items.toArray()[0]).getElement();
Long num = jedis.zrem("OrderId", orderId);
if( num != null && num>0){
System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}
}
在这种修改后,重新运行ThreadTest类,发现输出正常了
思路二:该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。
实现:在redis.conf中,加入一条配置
notify-keyspace-events Ex
运行代码如下
package com.rjzheng.delay5;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
public class RedisTest {
private static final String ADDR = "127.0.0.1";
private static final int PORT = 6379;
private static JedisPool jedis = new JedisPool(ADDR, PORT);
private static RedisSub sub = new RedisSub();
public static void init() {
new Thread(new Runnable() {
public void run() {
jedis.getResource().subscribe(sub, "[email protected]__:expired");//[email protected]__:expired是redis关于事件监听的东西
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
init();
for(int i =0;i
如若转载,请注明出处:https://www.ozabc.com/hot/373990.html