博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
阻塞队列防止并发,线程池单线程处理队列中的任务,应用于spring项目中
阅读量:5921 次
发布时间:2019-06-19

本文共 4976 字,大约阅读时间需要 16 分钟。

hot3.png

一、入口:接收mq发的消息放入队列中

import com.arcvideo.bee.rmq.annotation.Subscribe;import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * @Auther: zyx. * @Date: 2018/11/12 9:00 */@Slf4j@Componentpublic class MQReceiveData {    private final QueueProcessor queueProcessor;    public MQReceiveData(QueueProcessor queueProcessor) {        this.queueProcessor = queueProcessor;    }    /**     * MQ接收端,队列模式,接收资源发来的消息.     *     * @param: [data]     * @return: void     * @date: 2018/11/12 15:52     */    @Subscribe(Subscribe.Type.QUEUE)    public void receiveBroadcast(EventLogModel data) {        log.info("......SyncModel: Receive data ={}", data);        if (data == null) {            log.error("......SyncModel: Mq send data is null.");            return;        }        try {            queueProcessor.put(data);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

 

二、队列线程池管理,系统初始化就开始检查队列:

import static com.arcvideo.iface.sharedplat.sync.commons.SystemContants.DEFAULT_QUEUE_SIZE;import static com.arcvideo.iface.sharedplat.sync.commons.SystemContants.DEFAULT_THREAD_COUNT;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import com.arcvideo.iface.sharedplat.sync.push.matter.SyncTaskNotifier;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;/** * @Auther: zyx. * @Date: 2018/11/12 14:25 */@Slf4jpublic class QueueProcessor {    private final BlockingQueue
queue; private final ExecutorService pool; private volatile SyncTaskNotifier notifier; private volatile boolean running = true; public QueueProcessor() { /** * 有界队列,先进先出,内存中顺序存储 */ this.queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); /** * 创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程 * 如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中. */ this.pool = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT, new CustomizableThreadFactory("PushDataProcessor")); } /** * 获取队列属性. * * @return */ public BlockingQueue
getQueue() { return queue; } /** * put方式插入队列. * * @param e * @throws InterruptedException */ public void put(EventLogModel e) throws InterruptedException { if (running) { queue.put(e); } } public void setNotifier(SyncTaskNotifier notifier) { this.notifier = notifier; } /** * 服务器加载Servlet的时候,开始消费 */ public @PostConstruct void init() { pool.execute(new ThreadHandlerDataTask(queue, notifier, running)); } /** * 服务器卸载Servlet的时候运行 */ public @PreDestroy void destroy() throws InterruptedException { this.running = false; this.pool.shutdown(); if (!this.pool.awaitTermination(5, TimeUnit.SECONDS)) { log.warn("......SyncModel: Exceed max timeout for waiting shutdown, ready force close pool."); this.pool.shutdownNow(); } }

 

三、线程处理任务:

import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import com.arcvideo.iface.sharedplat.sync.push.matter.SyncTaskNotifier;import com.arcvideo.iface.sharedplat.sync.push.auth.Session;import com.arcvideo.iface.sharedplat.sync.utils.XinUtils;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;import java.util.concurrent.BlockingQueue;/** * @Auther: zyx. * @Date: 2018/11/12 15:40 */@Slf4jpublic class ThreadHandlerDataTask implements Runnable {    private final BlockingQueue
queue; private final SyncTaskNotifier notifier; private volatile boolean running; public ThreadHandlerDataTask(BlockingQueue
queue, SyncTaskNotifier notifier, boolean running) { this.running = running; this.notifier = notifier; this.queue = queue; } @Override public void run() { while (running) { EventLogModel eventLogModel; try { //阻塞取出,如果队列为空一直等待到不空为止,取出第一个之后队列减一. eventLogModel = queue.take(); } catch (InterruptedException e) { log.error("......SyncModel: Can't take from queue cause by InterruptedException."); break; } try {                //...开始去干活操作... 业务层去实现notfier接口                notifier.handlerTask(); } catch (Exception e) { log.error("......SyncModel: Current task error" + e.getMessage(), e); } } }}

转载于:https://my.oschina.net/zhangyaxin/blog/2961000

你可能感兴趣的文章
我所知道的前端组件化与模块化
查看>>
番茄花园病毒
查看>>
自然语言处理的6大法宝
查看>>
【HEVC学习与研究】40、X265的下载和编译
查看>>
摆脱缠斗 销售易推出PaaS平台“放长线钓大鱼”
查看>>
Yii2 理解Object
查看>>
您那些“高大上”的网络安全设备发挥最大能效了么?
查看>>
中外超算高峰论坛详解“太湖之光”戈登贝尔奖入围应用
查看>>
中国实现了光伏产品"白菜价",让美国企业纷纷倒闭?
查看>>
MobileIron:移动业务普及 需重新思考安全性
查看>>
程序员那些事:在家办公赚的更多
查看>>
14年优质服务 海科融通进军P2P资金托管
查看>>
CSS十问——好奇心+刨根问底=CSSer
查看>>
希捷与合作伙伴合作解决无人机数据需求
查看>>
不想“打破互联网”?你需要更安全的DNS
查看>>
倪光南:自主创新的华为市值是靠并购的联想10倍
查看>>
《SEO的艺术(原书第2版)》——1.1 搜索引擎的任务
查看>>
欧盟将限制16岁以下孩童用社交网络 需家长同意
查看>>
Redis Web界面管理工具
查看>>
第13代PowerEdge强劲升级 五大独有技术是什么?
查看>>