一、入口:接收mq发的消息放入队列中
import com.arcvideo.bee.rmq.annotation.Subscribe;import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * @Auther: zyx. * @Date: 2018/11/12 9:00 */@Slf4j@Componentpublic class MQReceiveData { private final QueueProcessor queueProcessor; public MQReceiveData(QueueProcessor queueProcessor) { this.queueProcessor = queueProcessor; } /** * MQ接收端,队列模式,接收资源发来的消息. * * @param: [data] * @return: void * @date: 2018/11/12 15:52 */ @Subscribe(Subscribe.Type.QUEUE) public void receiveBroadcast(EventLogModel data) { log.info("......SyncModel: Receive data ={}", data); if (data == null) { log.error("......SyncModel: Mq send data is null."); return; } try { queueProcessor.put(data); } catch (InterruptedException e) { e.printStackTrace(); } }}
二、队列线程池管理,系统初始化就开始检查队列:
import static com.arcvideo.iface.sharedplat.sync.commons.SystemContants.DEFAULT_QUEUE_SIZE;import static com.arcvideo.iface.sharedplat.sync.commons.SystemContants.DEFAULT_THREAD_COUNT;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import com.arcvideo.iface.sharedplat.sync.push.matter.SyncTaskNotifier;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;/** * @Auther: zyx. * @Date: 2018/11/12 14:25 */@Slf4jpublic class QueueProcessor { private final BlockingQueuequeue; private final ExecutorService pool; private volatile SyncTaskNotifier notifier; private volatile boolean running = true; public QueueProcessor() { /** * 有界队列,先进先出,内存中顺序存储 */ this.queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); /** * 创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程 * 如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中. */ this.pool = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT, new CustomizableThreadFactory("PushDataProcessor")); } /** * 获取队列属性. * * @return */ public BlockingQueue getQueue() { return queue; } /** * put方式插入队列. * * @param e * @throws InterruptedException */ public void put(EventLogModel e) throws InterruptedException { if (running) { queue.put(e); } } public void setNotifier(SyncTaskNotifier notifier) { this.notifier = notifier; } /** * 服务器加载Servlet的时候,开始消费 */ public @PostConstruct void init() { pool.execute(new ThreadHandlerDataTask(queue, notifier, running)); } /** * 服务器卸载Servlet的时候运行 */ public @PreDestroy void destroy() throws InterruptedException { this.running = false; this.pool.shutdown(); if (!this.pool.awaitTermination(5, TimeUnit.SECONDS)) { log.warn("......SyncModel: Exceed max timeout for waiting shutdown, ready force close pool."); this.pool.shutdownNow(); } }
三、线程处理任务:
import com.arcvideo.iface.resource.library.eventLog.EventLogModel;import com.arcvideo.iface.sharedplat.sync.push.matter.SyncTaskNotifier;import com.arcvideo.iface.sharedplat.sync.push.auth.Session;import com.arcvideo.iface.sharedplat.sync.utils.XinUtils;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;import java.util.concurrent.BlockingQueue;/** * @Auther: zyx. * @Date: 2018/11/12 15:40 */@Slf4jpublic class ThreadHandlerDataTask implements Runnable { private final BlockingQueuequeue; private final SyncTaskNotifier notifier; private volatile boolean running; public ThreadHandlerDataTask(BlockingQueue queue, SyncTaskNotifier notifier, boolean running) { this.running = running; this.notifier = notifier; this.queue = queue; } @Override public void run() { while (running) { EventLogModel eventLogModel; try { //阻塞取出,如果队列为空一直等待到不空为止,取出第一个之后队列减一. eventLogModel = queue.take(); } catch (InterruptedException e) { log.error("......SyncModel: Can't take from queue cause by InterruptedException."); break; } try { //...开始去干活操作... 业务层去实现notfier接口 notifier.handlerTask(); } catch (Exception e) { log.error("......SyncModel: Current task error" + e.getMessage(), e); } } }}