Skip to main content
Version: v3

Cloud Queue Guide

Cloud Queue allows you to invoke Cloud Functions outside of Cloud Engine’s context. Based on the existing implementation of Cloud Functions, Cloud Queue makes it possible for you to retry failed function calls, skip duplicate function calls, look up function outputs, delay function calls, and have your Cloud Functions triggered routinely. Queued tasks can be reliably stored in the Cloud Queue of your application and won’t be lost even though the Cloud Engine instances in your application get restarted due to deployments, overloading, and crashing. When these situations happen, the tasks in the queue will be executed once the instances get back to their normal state.

At this time, Cloud Queue is still an experimental feature for you to try for free. It will become a paid feature once it’s officially launched. Since Cloud Queue is designed to handle bursty traffic, you will be billed based on “the number of tasks queued per hour” and “the peak number of the remaining tasks in the queue”.

Cloud Queue runs outside of Cloud Engine and accepts tasks through HTTP requests. It also invokes Cloud Functions hosted on Cloud Engine with HTTP requests so the tasks can be executed. All these features are encapsulated in our SDK. For now, the interface to enqueue a task, Cloud.enqueue, can only be invoked with the masterKey within Cloud Engine (it might be opened up to clients in the future). The interface for checking results, Cloud.getTaskInfo, can be invoked by clients with uniqueId.

Features and Use Cases

Cloud Queue offers the following features:

  • Retry failed tasks If a task fails to run, Cloud Queue will try to run the task again. You can configure how many times a task should be retried (attempts) as well as the interval between every two attempts (backoff). When Cloud Queue retries a task, the uniqueId of the task won’t change.
  • Skip duplicate tasks You can set a uniqueId for each task (if the uniqueId is not provided, a random one will be generated). While the task is in the queue, Cloud Queue won’t accept another task with the same uniqueId.
  • Look up function outputs A task will remain in the queue for a while after it has been completed (set by keepResult). You can look up the result of the task with its uniqueId.
  • Delay tasks You can delay a task by setting delay.
  • Have Cloud Functions triggered routinely For now, scheduled tasks is a subfunction of Cloud Queue. You can create and manage any number of scheduled tasks on the dashboard. You can use CRON expressions or set intervals in seconds.
  • Concurrency control Cloud Queue will store the incoming tasks in a queue and run one of them each time. This prevents Cloud Engine from being overloaded. We will introduce a smarter concurrency controlling algorithm in the future.
  • Priority settings You can set a priority for certain tasks. Cloud Queue will keep running the task with the highest priority in the queue.

You can find demos showing how to use Cloud Engine on leanengine-nodejs-demos.

Basic Usage

const { Cloud } = require("leanengine");

// The Cloud Function being invoked
Cloud.define("closeOrder", async function ({ params }) {
try {
const order = await new Query("Order").get(params.id);
// The returned value can be used to loop up results in the future; it will be logged as well
return await order.save({ status: "closed" });
} catch (err) {
// Throw an error to make the task fail (errors from JavaScript and dependencies will also make the task fail)
throw new Cloud.Error(`Some error happened: ${err.message}`);
}
});

// Add a task; `enqueue` will return a `uniqueId` once the task is added to the queue (the task is not run yet)
// You can use `Cloud.enqueue` in any part of your project, including Cloud Functions and custom routes of your website
const { uniqueId } = await Cloud.enqueue("closeOrder", { id: 1234 });

// Get the result of the task; you can send the `uniqueId` to the client to have the client look up the result
// Only tasks in the queue can be looked up; you can change how long a completed task should stay in the queue with `keepResult`
console.log(await Cloud.getTaskInfo(uniqueId));

// Increasing `attempts` and decreasing `backoff`
Cloud.enqueue("closeOrder", { id: 1234 }, { attempts: 10, backoff: 10000 });

// Add a delayed task; `closeOrder` will run in 1 minute
Cloud.enqueue("closeOrder", { id: 1234 }, { delay: 600000 });

// Specify `uniqueId`; if a task with the same `uniqueId` already exists, an error will be thrown
// If a task is not in the queue anymore, its `uniqueId` will not be compared; you can change how long a completed task should stay in the queue with `keepResult`
Cloud.enqueue("closeOrder", { id: 1234 }, { uniqueId: "1234" });

To make a Cloud Function only able to be invoked by Cloud Queue (rather than a client), add the internal option to Cloud.define. Cloud Functions defined in this way can only be invoked by Cloud Queue and other code using masterKey.

API

Cloud Queue is supported by Node.js SDK 3.4 and above:

Cloud.enqueue(functionName, params, options?): Promise<{uniqueId: string}>
Cloud.getTaskInfo(uniqueId): Promise<TaskInfo>

Usage:

const { Cloud } = require("leanengine");

// Add a task; `enqueue` will return a `uniqueId` once the task is added to the queue (the task is not run yet)
const { uniqueId } = await Cloud.enqueue("sendMail", { userId: 1234 });

// Delay a task
Cloud.enqueue("closeOrder", { id: 1234 }, { delay: 600000 });

// Get the result
console.log(await Cloud.getTaskInfo(uniqueId));

options contains the following properties:

  • attempts?: number: Maximum number of retries; defaults to 1
  • backoff?: number: Interval for each retry (in milliseconds); defaults to 60000 (1 minute)
  • delay?: number: How long to delay (in milliseconds)
  • deliveryMode?: string: What to do when timeout happens; could be atLeastOnce (run at least once but could run multiple times); atMostOnce (run at most once and won’t retry); defaults to atLeastOnce
  • keepResult?: number: How long the task should stay in the queue once it’s done (in milliseconds); defaults to 300000 (5 minutes)
  • priority?: number: Priority; defaults to the current timestamp; change it to a smaller number to have the task run earlier
  • timeout?: number: Timeout in milliseconds; defaults to 15000; should be no more than 15000
  • uniqueId?: string: The unique ID of the task; should be no more than 32 characters; defaults to a random UUID

TaskInfo contains the following properties:

  • uniqueId: string: The unique ID of the task
  • status: string: The status of the task; could be queued (waiting or running), success (completed), or failed

The TaskInfo of a completed task will have:

  • finishedAt?: string The time the task was completed
  • statusCode?: number The HTTP status code of the Cloud Function’s response
  • result?: object The response from the Cloud Function

The TaskInfo of a failed task will have:

  • error?: string Error message
  • retryAt?: string The time the task will be retried for the next time

Performance and Reliability

The interface for enqueueing (Cloud.enqueue) is designed to handle bursty traffic (like above 1000 QPS). Cloud Queue will store all the tasks received and run one of them each time. This helps reduce the burden of your Cloud Engine instances.

Cloud Queue runs outside of Cloud Engine instances, meaning that even though your Cloud Engine instances encounter errors or are restarted, the Cloud Queue won’t be affected. Meanwhile, tasks are executed by the Cloud Functions within Cloud Engine instances, meaning that the environment for running tasks is the same as that for running Cloud Functions.

The Queue

If the concurrency limit is reached, new tasks will be placed into a queue. Whenever there’s a task finished, a task with the highest priority (lowest priority) from the queue will be executed.

The default value of priority is the timestamp the task is enqueued. For example, the timestamp of 2019-05-20T17:32:07.166+08:00 is 1558344727166. This means that if you don’t customize the priority, tasks will be executed according to the time they’re added to the queue. You can change the priority to a smaller number to have a task run earlier, or a larger number for a task to run later.

CRON Expressions

The basic syntax of a CRON expression is:

second minute hour day-of-month month day-of-week
PositionFieldConstraintsRangeSpecial Characters Accepted
1SecondRequired0–59, - * /
2MinuteRequired0–59, - * /
3HourRequired0–23 (0 is midnight), - * /
4Day of monthRequired1–31, - * ? /
5MonthRequired1–12、JAN–DEC, - * /
6Day of weekRequired1–7、SUN–SAT, - ? /

Special characters can be used in the following ways:

CharacterMeaningUsage
*All valuesAll the values a field can have. For example, to run a task every minute, set minutes to *.
?Unspecified valueCan be used on at most one of the two fields that accept this value. For example, to run a task on the 10th of every month regardless of what day it is, set day-of-month to 10 and day-of-week to ?.
-ScopeFor example, setting hours to 10-12 means 10am, 11am, and 12pm.
,Splitting multiple valuesFor example, setting day-of-week to MON,WED,FRI means Monday, Wednesday, and Friday.
/IntervalFor example, setting seconds to */15 means every 15 seconds starting from the 0th, which are the 0th, 15th, 30th, and 45th seconds.

Fields are concatenated with spaces. Values like JANDEC and SUNSAT are case-insensitive (MON is the same as mon).

To illustrate:

ExpressionExplanation
0 */5 * * * ?Run a task every 5 minutes.
10 */5 * * * ?Run a task every 5 minutes and the time to run it is always the 10th second of a minute (like 10:00:10, 10:05:10, etc.).
0 30 10-13 ? * WED,FRIRun a task at 10:30am, 11:30am, 12:30am, and 1:30pm every Wednesday and Friday.
0 */30 8-9 5,20 * ?Run a task every 30 minutes between 8am and 10am (8:00am, 8:30am, 9:00am, and 9:30am) on the 5th and 20th of every month.

The time zone followed by CRON expressions is UTC+8 for China and UTC+0 for other regions.

Restrictions

The following restrictions are applied while we’re testing Cloud Queue:

  • The QPS for enqueueing for each application is 100
  • At most 10000 tasks can be enqueued every day for each application
  • An application can have at most 1000 tasks in the queue

You may contact us if you need to have the restrictions adjusted.

FAQ

What’s your plan for the future?

这里列出的是后续可能会实施的一些计划,如果你非常需要其中某个功能,请通过工单或论坛联系我们,让我们知道。

  • 允许客户端调用 我们有计划为用户提供一个「允许客户端调用云队列」的选项,开启这个选项后客户端将会可以进行入队操作(Cloud.enqueue),但客户端不能指定 options 中的任何选项,只能传递参数。
  • 超时时间 因为之前我们的云函数一直将超时时间限制在 15 秒,所以目前云队列受限于云函数,超时也是 15 秒。我们正在调整相关的基础设施,计划在后续让从云队列调用云函数的超时时间最长可以达到 5 分钟。
  • 并发限制 目前所有应用的并发限制都是 1,我们有计划实现一种自动控制并发的机制:在任务较多的情况下,并发会逐渐增加,直到负载体现在实例的 CPU 或内存压力上。
  • 使用单独的分组运行任务 我们有计划为用户提供一个「在独立分组中运行定时任务」的选项,开启这个选项后会自动创建一个特殊分组,所有定时任务都在这个分组中运行。
  • 定时任务的可编程接口 我们有计划为定时任务提供可编程接口,但优先级较低。
  • 云队列的日志 目前云队列会将执行日志直接打印到云引擎的应用日志中,后续我们准备把云队列的日志显示在一个单独的 Tab 中。
  • 在其他 SDK 中使用云函数 后续我们会在 Python SDK、Java SDK、PHP SDK 中添加云队列的支持。

What situations will be affected by the error handling policy (deliveryMode)?

异常处理策略(deliveryMode)用于在以下几种不确定任务是否已经执行或是否应该执行的情况下,来决定是否重试:

  • 云队列已将请求发到云函数,但云函数未在超时时间内给出成功或失败的响应。
  • 因云队列本身的故障导致失去对正在执行的任务的追踪。
  • 因云队列本身的故障导致定时任务没有在指定的时间触发。

如果选择「放弃(atMostOnce)」在出现上述情况时任务可能不会执行;如果选择「重试(atLeastOnce)」在发生上述情况时任务可能会执行多次。

Can I use Cloud Queue when debugging my project locally?

你可以在本地调试时远程调用云队列相关的 API,但云队列只会在线上的云引擎中运行指定的云函数。