博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Node.js + Redis Sorted Set 任务队列
阅读量:6715 次
发布时间:2019-06-25

本文共 5975 字,大约阅读时间需要 19 分钟。

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" },这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

  1. 并行执行多个任务

  2. 任务唯一性

  3. 任务成功或失败后的处理

针对以上问题的解决方案

  1. 并行执行多个任务利用 Promise.all 来实现

  2. 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。

  3. 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

// remote_api.js 模拟第三方 API'use strict';const app = require('express')();app.get('/', (req, res) => {    setTimeout(() => {        let arr = [200, 300];  // 200 代表成功,300 代表失败需要重新请求        res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });    }, 3000);});app.listen('9001', () => {    console.log('API 服务监听端口:9001');});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列'use strict';const app = require('express')();const redisClient = require('redis').createClient();const QUEUE_NAME = 'queue:example';function addTaskToQueue(taskName, callback) {    // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列    redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {        if (error) {            console.log(error);        } else {            if (task) {                console.log('任务已存在,不新增相同任务');                callback(null, task);            } else {                redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {                    if (error) {                        callback(error);                    } else {                        callback(null, result);                    }                });            }        }    });}app.get('/', (req, res) => {    let taskName = req.query['task-name'];    addTaskToQueue(taskName, (error, result) => {        if (error) {            console.log(error);        } else {            res.status(200).send('正在查询中......');        }    });});app.listen(9002, () => {    console.log('生产者服务监听端口:9002');});
// consumer.js 定时获取任务并执行'use strict';const redisClient = require('redis').createClient();const request = require('request');const schedule = require('node-schedule');const QUEUE_NAME = 'queue:expmple';const PARALLEL_TASK_NUMBER = 2;  // 并行执行任务数量function getTasksFromQueue(callback) {    // 获取多个任务    redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {        if (error) {            callback(error);        } else {            // 将任务分值设置为 0,表示正在处理            if (tasks.length > 0) {                let tmp = [];                tasks.forEach((task) => {                    tmp.push(0);                    tmp.push(task);                });                redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {                    if (error) {                        callback(error);                    } else {                        callback(null, tasks)                    }                });            }        }    });}function addFailedTaskToQueue(taskName, callback) {    redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {        if (error) {            callback(error);        } else {            callback(null, result);        }    });}function removeSucceedTaskFromQueue(taskName, callback) {    redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {        if (error) {            callback(error);        } else {            callback(null, result);        }    })}function execTask(taskName) {    return new Promise((resolve, reject) => {        let requestOptions = {            'url': 'http://127.0.0.1:9001',            'method': 'GET',            'timeout': 5000        };        request(requestOptions, (error, response, body) => {            if (error) {                resolve('failed');                console.log(error);                addFailedTaskToQueue(taskName, (error) => {                    if (error) {                        console.log(error);                    } else {                    }                });            } else {                try {                    body = typeof body !== 'object' ? JSON.parse(body) : body;                } catch (error) {                    resolve('failed');                    console.log(error);                    addFailedTaskToQueue(taskName, (error, result) => {                        if (error) {                            console.log(error);                        } else {                        }                    });                    return;                }                if (body.status !== 200) {                    resolve('failed');                    addFailedTaskToQueue(taskName, (error, result) => {                        if (error) {                            console.log(error);                        } else {                        }                    });                } else {                    resolve('succeed');                    removeSucceedTaskFromQueue(taskName, (error, result) => {                        if (error) {                            console.log(error);                        } else {                        }                    });                }            }        });    });}// 定时,每隔 5 秒获取新的任务来执行let job = schedule.scheduleJob('*/5 * * * * *', () => {    console.log('获取新任务');    getTasksFromQueue((error, tasks) => {        if (error) {            console.log(error);        } else {            if (tasks.length > 0) {                console.log(tasks);                Promise.all(tasks.map(execTask))                .then((results) => {                    console.log(results);                })                .catch((error) => {                    console.log(error);                });                            }        }    });});

转载地址:http://gqelo.baihongyu.com/

你可能感兴趣的文章
自定义格式化字符串
查看>>
bgp发布路由对端无法收到,原因是使用默认网段
查看>>
JQuery实现简单的服务器轮询效果
查看>>
幽灵漏洞(GHOST)影响大量Linux操作系统及其发行版(更新修复方案)
查看>>
Sunday算法
查看>>
netstat
查看>>
优朋普乐:OTT正重构电视版图
查看>>
遇到"process launch failed: Security"问题,解决的一种方法
查看>>
Ubuntu 14.04 LTC 有线网络——网线不识别,灯不亮问题
查看>>
Unity3D DLL加密
查看>>
求数组中最长递增子序列
查看>>
Spring Boot cache backed redis
查看>>
有趣的编程----控制自己电脑的CPU
查看>>
linux的目录结构
查看>>
Java中创建对象的5种不同方法
查看>>
Supervisor安装
查看>>
自建框架知识点一命名空间和自动加载
查看>>
21_css布局2_浮动布局.html
查看>>
DateUtils 单元下的公用函数目录
查看>>
构建高效安全的Nginx Web服务器
查看>>