GaoYida


  • 首页

  • 分类

  • 归档

Spring之Schedule的调度实现

发表于 2019-11-24 | 分类于 Spring

序言

  日常开发中,经常会接触定时任务,比如数据同步,数据的聚合,报表发送等等。目前常用的调度第三方库有Spring-Schedule,Quartz等,本文主要介绍Spring-Schedule的调度实现。

示例

  首先pom里引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
<!--spring-boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.12.RELEASE</version>
</dependency>
<!--spring-framework-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.16.RELEASE</version>
</dependency>

  下图的代码片段是一个使用Spring库进行调度的例子,对调度的service方法配置cron表达式实现定时执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
@EnableScheduling
public class SchduleApplication {
public static void main(String[] args) {
SpringApplication.run(SchduleApplication.class, args);
}
@Scheduled(cron = "0/5 * * * * *")
public void service() throws InterruptedException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(Thread.currentThread().getName() + " start at " + df.format(new Date()));
Thread.sleep(1 * 1000);
System.out.println(Thread.currentThread().getName() + " end at " + df.format(new Date()));
}
}

设计与实现

  @EnableScheduling注解里注册了ScheduledAnnotationBeanPostProcessor,该BeanPostProcessor里有如下几个重要方法:

  1. postProcessAfterInitialization
  2. finishRegistration
  3. postProcessBeforeDestruction

任务采集 — postProcessAfterInitialization

  在实例化出Bean,并对Bean进行初始化后,会进入该postProcess方法,对Bean里@Scheduled注解的方法进行解析。(注意同一个方法可能会有多个@Scheduled注解)按照@Scheduled的配置会包装成不同的Task,比如CronTask、IntervalTask。ScheduledTaskRegistrar会对任务容器启动中收集的任务进行暂存,在容器刷新事件发布后,用来启动任务。这里列出了Task的类图:

任务注册与启动 — finishRegistration

  Spring应用启动完成后,会发布ContextRefreshedEvent,ScheduledAnnotationBeanPostProcessor在onApplicationEvent方法里会对Bean实例化期间收集的调度任务进行启动。
  任务的注册启动涉及三个组件:

  1. Task (调度方法/Bean的包装类-ScheduledMethodRunnable)
  2. Trigger (调度表达式包装类-CronTrigger/PeriodicTrigger)
  3. ScheduledExecutorService (调度线程池-JDK实现)

  任务主要分为两种:间隔执行任务,Cron执行任务。间隔执行任务会以如下方式加入到调度线程池执行:

1
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

  JDK里ScheduledThreadPoolExecutor的实现是,注册任务时将下一次触发时相对于当前的延迟时间计算出来,放入DelayedWorkQueue,线程池里的线程会不停去检查DelayedWorkQueue里可以执行的任务,取出来执行。(即调度线程池默认会保证至少存在1个核心线程,且不允许超时退出)。对于周期性任务,当前执行完成后,会计算下一次执行的延迟时间,再次加入DelayedWorkQueue。
  对于Cron执行任务,CronTrigger对Cron表达式实现了解析,用以获取nextExecutionTime。具体实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
this.scheduledExecutionTime = this.trigger.nextExecutionTim(this.triggerContext);//根据cron表达式获取下一任务执行时间戳
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() -System.currentTimeMillis();//计算任务执行延迟
this.currentFuture = this.executor.schedule(this, initialDelay,TimeUnit.MILLISECONDS);//放入调度线程池
return this;
}
}
@Override
public void run() {
Date actualExecutionTime = new Date();
super.run();//执行任务,注意这里是同步执行,如果执行时间较长,可能会出现跳过未来应该执行的时刻。
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!this.currentFuture.isCancelled()) {
schedule();//再调度
}
}
}

  在上述的分析里,无论JDK里还是Spring都是同步去执行任务,如果任务执行的时间过长,可能会错过下一次的执行时机。那是否可以支持执行呢,答案是肯定的,Spring-Schedule提供了异步支持。

任务异步执行 — EnableAsync

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableAsync
public class ScheduledService {
@Async
@Scheduled(cron = "0/5 * * * * *")
public void service() throws InterruptedException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(Thread.currentThread().getName() + " start at " + df.format(new Date()));
Thread.sleep(6 * 1000);
System.out.println(Thread.currentThread().getName() + " end at " + df.format(new Date()));
}
}

  如上是异步执行的例子。EnableAsync注解会注册AsyncAnnotationBeanPostProcessor,该BeanPostProcessor会对@Async注解的Bean进行代理,并编织Advice——AnnotationAsyncExecutionInterceptor。在异步任务的默认实现里,执行任务时会创建SimpleAsyncTaskExecutor去执行。SimpleAsyncTaskExecutor的执行方法:

1
2
3
4
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}

可以看到,每次任务执行时会创建新的线程。注意,实际应用的时候,要考虑任务量和执行时间,以及合理的线程数。

自定义业务逻辑

  对于同步或者异步执行的结果如何收集起来呢?比如执行的结果需要推送到消息队列应该怎么实现呢?如果要进行业务抢占呢?

  • 解决方案
    既然Spring-Scheduler的异步实现中能对Bean进行功能编织,我们亦可以对方法进行推送和抢占逻辑的增强。

任务销毁 — postProcessBeforeDestruction

  当应用停掉的时候,任务也应该进行相应的关闭。以下为调度任务的取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
//cancel线程池中的任务,并关闭线程池。
public void destroy() {
synchronized (this.scheduledTasks) {
Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
for (Set<ScheduledTask> tasks : allTasks) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
this.scheduledTasks.clear();
}
this.registrar.destroy();
}

总结

  本文梳理了Spring的调度实现,但该调度实现为单机的,很多场景下应用是集群部署的,任务还需要抢占执行。分布式的调度实现会在后续的博文中继续分析。

分布式发号器-Fly

发表于 2019-11-15 | 分类于 分布式应用

引言

  前不久看到一篇文章,如何做一个靠谱的发号器,觉得是个挺好的设计,于是使用Java实现了一番。

Fly

  发号器,取名fly的意思是,像有无数飞虫一样,不断产生,取之不尽。查看项目地址。

持久化

  UID的生成记录会写入ZooKeeper,服务初始化/重启时从ZooKeeper读取配置信息。使用初始化作为持久化的原因是,ZooKeeper是强一致的,在全局单调ID的生成模式下,如果主服务器宕机,可以切换备服务器工作,备服务器从ZooKeeper读取配置,保证发出的ID是唯一的。
  如果采用MySQL和Redis产生ID的实现,由于MySQL和Redis的复制机制无法保证强一致,当MySQL或Redis发生主备切换,备机尚未完全同步的话,还是会发出重复的ID。

通信协议

  实现TLV格式的协议。

高可用方案设计

  对于全局单调ID的生成,需要使用主备模式;对于全局唯一ID的生成,主备模式和负载均衡模式均可实现。在实际使用时,我们倾向于全局单调ID使用主备模式,全局唯一ID使用负载均衡模式部署服务。

主备模式

架构图

选主

  部署服务的实例会在zk进行master节点抢占,抢占成功的实例会成为master,执行id的生成和分配。其他服务器会成为备服务器,执行客户端请求的转发,保障高可用。

ID的生成规则

  全局单调性,目前实现是一个不断递增的整数。

主备切换

  当前运行的服务端会写到zk里,如果异常/下线将触发事件通知其他运行实例进行抢占执行。注意,因为服务端实现了转发,对客户端来说是无感知的,当客户端连接原master异常时,会自动尝试连接其他服务地址。

缺点

  服务非负载均衡。

负载均衡模式

架构图

部署

  所有服务实例都是平等的对外提供服务。没有主从之分。

ID的生成规则

  基于Twitter的SnowFlake算法,具体规则:参考

服务宕机

  客户端自动尝试连接其他服务地址。

缺点

  UUID的取用非全局单调。

主要代码

FlyService

  接收处理客户端请求,默认端口是8888。对于当前非master的服务实例,将执行转发请求到master服务实例。

ForwardService

  提供转发服务。具体实现是维护到master的连接池,同时当有请求发出的时候,将异步结果获取转为同步。

StateMachine

  维护当前服务实例的状态,是否是master,是否avaliable等。当有master状态转变时,需要实时更新资源。

FlyManager

  负责与zk的交互,执行master抢占和监听。

TODO

  1. 考虑到如果zk服务出现异常,我们可以降级到通过数据库实现UUID生成。
  2. 转发协议,压测转发处理的性能,以及连接池的优化;客户端连接管理。
  3. 完成客户端实现。

Mybatis之Possible unexpected auto-mapping

发表于 2018-12-17 | 分类于 Mybatis

  最近在做项目改造,将项目改造成SpringBoot应用。主要工作规整旧的配置,调整部分代码结构,对项目的代码未做变动,完成后提交给QA做回归验证。
  就在上周,测试同学将我唤了过去,说有页面请求接口报500。看到现象顿感疑惑,首先我并未修改之前的代码,查看线上master分支是运行正常的,是否是测试环境有脏数据?或者登陆异常?
  查看测试环境的日志,代码有一处抛了空指针,为了简化描述,还原当时的代码场景如下:

1
2
3
4
105 for (Account account : accounts) {
106 User user = userService.findById(account.getUserId());
107 String userId = user.getName() + "(" + account.getUserId() + ")";
108 }

  日志分析空指针是在第107行的 user.getName() 抛出的。一眼看去,的确未对user是否为空作判断,遂执行查询数据库的SQL,看是否有account和user数据不一致的情况,结果user均可查到!让我们看下AccountMapper.xml里的查询:

1
2
3
4
5
6
7
8
9
10
11
<resultMap id="BaseResultMap" type="com.xxx.User">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="user_id" jdbcType="VARCHAR" property="userId" />
</resultMap>
<select id="getAccountDetail" resultMap="BaseResultMap">
SELECT
id,
user_id as userId
from account
</select>

  这样看,userId拿出来肯定是空的呀,我们使用的对象里属性是userId,为什么线上master分支还能正确执行?(当时怀疑合代码的问题,后查看master分支代码是一致的)。
  为什么相同的代码跑出了不同的结果,还和我预想的结果刚好相反。(猜想是master运行应该也会报错才对)。不对,还是可能存在不同的代码,就是我引入的jar包。在原来的Web工程里,引入的mybais是:

1
2
3
4
5
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.2</version>
<dependency>

当前springboot应用引入的依赖里,使用的mybatis是3.4.5。

1
2
3
4
5
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
<dependency>

  难道是这两个版本之间的改动?可能别人也遇到过这样的问题。于是到GitHub的mybatis项目上看3.4.3到3.4.5的发布日志,发现了蛛丝马迹:

  我们由此知道在3.4.2出现的现象是个bug,3.4.3版本修复了这个问题。问题已经找到。解决办法呢?是修改原有代码还是将错就错?由于原版本一直良好运行,以及提升版本也可能出现其他的问题,于是选择继续沿用3.4.2版本。
  坑就如同上述,但是导致这一问题的原因是什么,PR #895 是如何修复这个bug的呢?
  我们需要对源码作进一步分析。对这个bug的修复主要涉及两个文件——DefaultResultSetHandler.java和ResultMap.java。DefaultResultSetHandler处理SQL查询结果集,生成结果列表。通过debug看到结果映射调用堆栈是(代码版本mybatis 3.4.2):

  • loadMappedAndUnmappedColumnNames
  • getUnmappedColumnNames
  • createAutomaticMappings
  • applyAutomaticMappings
  • getRowValue
  • handleRowValuesForSimpleResultMap
  • handleRowValues

查看代码片段1,select 出的 userId 因为没有在resultMap里找到column映射,所以被放在unmappedColumnNames里返回。

查看代码片段2,尝试将unmappedColumnNames放入automapping。

查看代码片段3,得到userId的value.

查看代码片段4,代码里先automapping,再使用propertymapping,所以查出的结果userId值获取到了。

  当前的判断逻辑是在unmappedColumnNames中的属性,如果对象里有该属性且有set方法会被自动映射(mybatis自动映射是默认打开的)。实际应该在resultmap里未被映射的情况下才使用自动映射,即存在映射如下,默认的userId -> userId不应该生效。

1
<result column="user_id" jdbcType="VARCHAR" property="userId" />

  所以在修复的版本(mybatis 3.4.3)中在增加resultmap的property记录,并且增加if判断条件。即unmappedColumnNames中的userId在判断时包含在mappedProperties里,不会被自动映射,会遵从xml里的resultmap。

  至此问题得到解决,本文主要围绕问题#895 分析,感谢阅读。

一些记录

发表于 2018-02-17

Don’t let others noise drown out your own inner voice, and most importantly, have the courage to follow your heart and intuition.

对一个优秀工程师最大的致敬就是学习并超过他。

GaoYida

GaoYida

Something Just Like This!

4 日志
3 分类
© 2019 GaoYida
由 Hexo 强力驱动
主题 - NexT.Gemini