welcome to xlongwei.com

欢迎大家一起学习、交流、分享


QQ:9167702333 邮箱:admin@xlongwei.com

TaskUtil 线程池任务调度和定时任务工具


分类 Java   关键字 分享   标签 java   algorithm   发布 hongwei  1427639540641
注意 转载须保留原文链接,译文链接,作者译者等信息。  
我要赚钱:注册财富箱,享投资12%收益!


TaskUtil封装了ExecutorService和ScheduledExecutorService来分别提供线程池任务调度和定时任务调度接口,支持监视任务保持运行,支持shutdown hook钩子,支持任务完成后回调接口。

public class TaskUtil {
	private static Logger logger = LoggerFactory.getLogger(TaskUtil.class);
	private static ExecutorService cachedExecutor = null;
	private static ScheduledExecutorService scheduledExecutor = null;
	private static Map<Runnable, Future<?>> keepRunningTasks = null;
	private static Map<Future<?>, Callback> callbackdTasks = null;
	private static List<Object> shutdownHooks = new LinkedList<>();
	static {
		cachedExecutor = Executors.newFixedThreadPool(9, new TaskUtilThreadFactory("cached"));
		scheduledExecutor = Executors.newScheduledThreadPool(3, new TaskUtilThreadFactory("scheduled"));
		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				TaskUtil.shutdown();
			}
		});
	}
	
	/**
	 * 关闭TaskUtil,通常情况下不必手动调用
	 */
	public static void shutdown() {
		if(shutdownHooks!=null && shutdownHooks.size()>0) {
			for(Object shutdownHook:shutdownHooks) {
				logger.info("shutdown: "+shutdownHook);
				Class<? extends Object> clazz = shutdownHook.getClass();
				if(Closeable.class.isAssignableFrom(clazz)) {
					try {
						((Closeable)shutdownHook).close();
					}catch(IOException e) {
						logger.warn("fail to shutdown Closeable: "+shutdownHook, e);
					}
				}else if(Runnable.class.isAssignableFrom(clazz)) {
					TaskUtil.submit((Runnable)shutdownHook);
				}else if(Callable.class.isAssignableFrom(clazz)) {
					TaskUtil.submit((Callable<?>)shutdownHook);
				}else if(Thread.class.isAssignableFrom(clazz)) {
					((Thread)shutdownHook).start();
				}
			}
		}
		scheduledExecutor.shutdown();
		cachedExecutor.shutdown();
		if(!scheduledExecutor.isTerminated()) scheduledExecutor.shutdownNow();
		if(!cachedExecutor.isTerminated()) scheduledExecutor.shutdownNow();
		logger.info("TaskUtil executors shutdown.");
	}
	
	/**
	 * @param shutdownHook
	 * <ul>
	 * <li>Closeable
	 * <li>Runable or Callable
	 * <li>Thread
	 */
	public static boolean addShutdownHook(Object shutdownHook) {
		Class<? extends Object> clazz = shutdownHook.getClass();
		boolean validShutdownHook = false;
		if(Closeable.class.isAssignableFrom(clazz)) validShutdownHook = true;
		if(!validShutdownHook && (Runnable.class.isAssignableFrom(clazz) || Callable.class.isAssignableFrom(clazz))) validShutdownHook = true;
		if(!validShutdownHook && Thread.class.isAssignableFrom(clazz)) validShutdownHook = true;
		if(validShutdownHook) shutdownHooks.add(shutdownHook);
		return validShutdownHook;
	}

	/**
	 * 立即执行任务
	 */
	public static Future<?> submit(Runnable task) {
		return cachedExecutor.submit(task);
	}
	
	/**
	 * 自动保持任务持续运行,每分钟监视一次
	 */
	public static Future<?> submitKeepRunning(Runnable task){
		Future<?> future = submit(task);
		checkInitCachedTasks();
		synchronized (keepRunningTasks) {
			keepRunningTasks.put(task, future);
		}
		return future;
	}
	
	/**
	 * 延迟执行任务,例如延迟5秒:schedule(task,5,TimeUnit.SECONDS)
	 */
	public static void schedule(Runnable task, long delay, TimeUnit unit) {
		scheduledExecutor.schedule(new ScheduleTask(task), delay, unit);
	}
	
	/**
	 * 定时执行任务一次,比如下午两点:scheduleAt(task, DateUtils.setHours(new Date(), 13))
	 */
	public static void scheduleAt(Runnable task, Date time) {
		long mills = time.getTime() - System.currentTimeMillis();
		schedule(task, mills>0 ? mills : 3, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * 定时重复执行任务,比如延迟5秒,每10分钟执行一次:scheduleAtFixRate(task, 5, TimeUnit.MINUTES.toSeconds(10), TimeUnit.SECONDS)
	 */
	public static void scheduleAtFixedRate(Runnable task, long initialDelay, long delay, TimeUnit unit) {
		scheduledExecutor.scheduleWithFixedDelay(new ScheduleTask(task), initialDelay, delay, unit);
	}
	
	/**
	 * 定时重复执行任务,比如下午两点开始,每小时执行一次:scheduleAtFixRate(task, DateUtils.setHours(new Date(), 13), 1, TimeUnit.HOURS)
	 */
	public static void scheduleAtFixtRate(Runnable task, Date time, long delay, TimeUnit unit) {
		long mills = time.getTime() - System.currentTimeMillis();
		scheduleAtFixedRate(task, mills>0 ? mills : 3, unit.toMillis(delay), TimeUnit.MILLISECONDS);
	}
	
	/**
	 * 提交带返回值的任务,支持后续处理
	 */
	public static <T> Future<T> submit(Callable<T> task) {
		return cachedExecutor.submit(task);
	}
	
	/**
	 * 提交带返回值的任务,支持后续处理
	 */
	public static <T> Future<T> submit(Callable<T> task, Callback callback) {
		Future<T> future = submit(task);
		checkInitCachedTasks();
		if(callback != null) {
			synchronized (callbackdTasks) {
				callbackdTasks.put(future, callback);
			}
		}
		return future;
	}
	
	/**
	 * 提交任务,等待返回值
	 */
	public static <T> T wait(Callable<T> task) {
		Future<T> future = cachedExecutor.submit(task);
		try {
			return future.get();
		} catch (Exception e) {
			logger.warn("fail to wait task: "+task, e);
			return null;
		}
	}
	
	private static void checkInitCachedTasks() {
		if(keepRunningTasks != null) return;
		
		keepRunningTasks = new HashMap<Runnable, Future<?>>();
		callbackdTasks = new HashMap<Future<?>, Callback>();
		scheduleAtFixedRate(new CachedTasksMonitor(), 1, 1, TimeUnit.MINUTES);
	}
	
	/**
	 * 监视需要保持运行的任务
	 */
	private static class CachedTasksMonitor implements Runnable {
		public void run() {
			if(keepRunningTasks.size() > 0) {
				synchronized (keepRunningTasks) {
					Map<Runnable, Future<?>> tempTasks = null;
					for(Runnable task : keepRunningTasks.keySet()) {
						Future<?> future = keepRunningTasks.get(task);
						if(future.isDone()) {
							future = submit(task);//恢复运行结束任务
							if(tempTasks == null) tempTasks = new HashMap<Runnable, Future<?>>();
							tempTasks.put(task, future);
						}
					}
					if(tempTasks != null && tempTasks.size() > 0) keepRunningTasks.putAll(tempTasks);
				}
			}
			
			if(callbackdTasks.size() > 0) {
				synchronized (callbackdTasks) {
					List<Future<?>> callbackedFutures = null;
					for(Future<?> future : callbackdTasks.keySet()) {
						final Callback callback = callbackdTasks.get(future);
						if(future.isDone()) {
							try{
								final Object result = future.get(5, TimeUnit.SECONDS);
								submit(new Runnable() {
									public void run() {
										callback.handle(result);
									}
								});
								if(callbackedFutures == null) callbackedFutures = new LinkedList<Future<?>>();
								callbackedFutures.add(future);
							}catch (Exception e) {
								logger.warn("TaskUtil callbackedTasks warn: ", e);
							}
						}
					}
					
					if(callbackedFutures != null && callbackedFutures.size() > 0) {
						for(Future<?> future : callbackedFutures) {
							callbackdTasks.remove(future);
						}
					}
				}
			}
		}
	}
	
	/**
	 * 自定义线程名称Task-idx-name-idx2
	 */
	private static class TaskUtilThreadFactory implements ThreadFactory {
		private final static AtomicInteger taskutilThreadNumber = new AtomicInteger(1);
		private final String threadNamePrefix;
		TaskUtilThreadFactory(String threadNamePrefix){
			this.threadNamePrefix = threadNamePrefix;
		}
		
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r, String.format("TaskUtil-%d-%s", taskutilThreadNumber.getAndIncrement(), this.threadNamePrefix));
		    t.setDaemon(true);
		    t.setPriority(Thread.MIN_PRIORITY);
			return t;
		}
	}
	
	/**
	 * 封装定时任务,每次调度时使用cached thread运行,基本不占用调度执行时间
	 * @author hongwei
	 * @date 2014-09-05
	 */
	private static class ScheduleTask implements Runnable {
		private Runnable runner;
		public ScheduleTask(Runnable runnable) {
			this.runner = runnable;
		}
		@Override
		public void run() {
			TaskUtil.submit(runner);
		}
	}
	
	/**
	 * 等待结果回调接口
	 */
	public static interface Callback {
		void handle(Object result);
	}
}