【微服务】如何用Azure容器应用Job处理异步HTTP API请求
推荐超级课程:
@TOC
在构建HTTP API时,同步执行长时间运行的任务可能会很有诱惑力。这种方法可能导致响应缓慢、超时和资源耗尽。如果请求超时或连接断开,客户端将不知道操作是否完成。对于CPU密集型任务,这种方法还可能使服务器陷入困境,使其无法响应其他请求。 在本文中,我们将探讨如何使用Azure容器应用构建异步HTTP API。我们将创建一个简单的API,实现异步请求-回复模式:API托管在容器应用中,而异步处理则在Job中完成。这种方法为长时间运行的任务提供了更强大和可扩展的解决方案。
Azure容器应用中的长时间运行API请求
Azure容器应用是一个无服务器容器平台。它非常适合托管各种工作负载,包括HTTP API。 与其他无服务器和PaaS平台一样,Azure容器应用旨在处理短期请求——其入口目前具有4分钟的最大超时时间。作为一个自动扩展平台,它旨在根据传入请求的数量动态扩展。在缩减规模时,会移除副本。如果处理请求的副本被移除,长时间运行的请求可能会突然终止。
Azure容器应用Job
Azure容器应用有两种资源:应用和Job。应用是长时间运行的服务,响应HTTP请求或事件。Job是运行到完成的任务,可以通过计划或事件触发。 Job还可以通过编程方式触发。这使得它们非常适合在HTTP API中实现异步处理。API可以启动Job执行来处理请求,并立即返回响应。Job可以需要多长时间来完成处理。客户端可以轮询应用中的状态端点,以检查Job是否完成并获取结果。
异步请求-回复模式
异步请求-回复是HTTP API处理长时间运行操作的一种常见模式。API不是等待操作完成,而是返回一个状态码,指示操作已经开始。然后,客户端可以轮询API,以检查操作是否完成。 以下是Azure容器应用中该模式的应用方式:
- 客户端向API(作为容器应用托管)发送请求以启动操作。
- API保存请求(我们的示例使用Azure Cosmos DB),启动Job来处理操作,并返回202 Accepted状态码以及指向状态端点的Location头。
- 客户端轮询状态端点。当操作正在进行时,状态端点返回200 OK状态码,以及指示客户端应何时再次轮询的Retry-After头。
- 当操作完成时,状态端点返回303 See Other状态码,以及指向结果的Location头。客户端自动跟随重定向以获取结果。
异步HTTP API应用
您可以在本文上方链接中找到源代码。 该API是一个简单的Node.js应用,使用Fastify。它演示了如何构建一个接受订单并将订单处理卸载到Job的异步HTTP API。该应用有几个简单的端点。
POST /orders
此端点接受其正文中的订单。它将订单保存到Cosmos DB,状态为“待处理”,并启动Job执行以处理订单。
fastify.post('/orders', async (request, reply) => {
const orderId = randomUUID()
// 将订单保存到Cosmos DB
await container.items.create({
id: orderId,
status: 'pending',
order: request.body,
})
// 启动Job执行
await startProcessorJobExecution(orderId)
// 返回202 Accepted和Location头
reply.code(202).header('Location', '/orders/status/' + orderId).send()
})
我们将在本文稍后介绍Job。在上面的代码片段中,startProcessorJobExecution
是一个启动Job执行的功能。它使用Azure容器应用管理SDK来启动Job。
const credential = new DefaultAzureCredential()
const containerAppsClient = new ContainerAppsAPIClient(credential, subscriptionId)
// ...
async function startProcessorJobExecution(orderId) {
// 获取现有Job的模板
const { template: processorJobTemplate } =
await containerAppsClient.jobs.get(resourceGroupName, processorJobName)
// 将订单ID添加到Job的环境变量中
const environmentVariables = processorJobTemplate.containers[0].env
environmentVariables.push({ name: 'ORDER_ID', value: orderId })
const jobStartTemplate = { template: processorJobTemplate }
// 使用修改后的模板启动Job执行
const jobExecution = await containerAppsClient.jobs.beginStartAndWait(
resourceGroupName, processorJobName, {
template: processorJobTemplate,
}
)
}
Job将订单ID作为环境变量。为了设置环境变量,我们使用修改后的模板启动Job执行,该模板包含订单ID。 我们使用托管身份来同时认证Azure容器应用管理SDK和Cosmos DB SDK。
GET /orders/status/:orderId
上一个端点返回202 Accepted状态码和指向此状态端点的Location头。客户端可以轮询此端点以检查订单的状态。 此请求处理程序从Cosmos DB检索订单。如果订单仍处于待处理状态,它返回200 OK状态码和指示客户端应何时再次轮询的Retry-After头。如果订单已完成,它返回303 See Other状态码和指向结果的Location头。
fastify.get('/orders/status/:orderId', async (request, reply) => {
const { orderId } = request.params
// 从Cosmos DB获取订单
const { resource: item } = await container.item(orderId, orderId).read()
if (item === undefined) {
reply.code(404).send()
return
}
if (item.status === 'pending') {
reply.code(200).headers({
'Retry-After': 10,
}).send({ status: item.status })
} else {
reply.code(303).header('Location', '/orders/' + orderId).send()
}
})
GET /orders/:orderId
此端点返回订单处理的结果。状态端点在订单完成时重定向到此资源。它从Cosmos DB检索订单并返回。
fastify.get('/orders/:orderId', async (request, reply) => {
const { orderId } = request.params
// 从Cosmos DB获取订单
const { resource: item } = await container.item(orderId, orderId).read()
if (item === undefined || item.status === 'pending') {
reply.code(404).send()
return
}
if (item.status === 'completed') {
reply.code(200).send({ id: item.id, status: item.status, order: item.order })
} else if (item.status === 'failed') {
reply.code(500).send({ id: item.id, status: item.status, error: item.error })
}
})
订单处理器Job
订单处理器Job是另一个Node.js应用。由于这只是一个演示,它只是等待一段时间,更新Cosmos DB中的订单状态,然后退出。在现实世界的场景中,Job将处理订单,更新订单状态,并可能发送通知。 我们将其部署为Azure容器应用中的Job。上面的POST /orders端点启动Job执行。Job将订单ID作为环境变量,并使用它来更新Cosmos DB中的订单状态。 与API应用一样,Job使用托管身份来认证Azure Cosmos DB。
import { DefaultAzureCredential } from '@azure/identity'
import { CosmosClient } from '@azure/cosmos'
const credential = new DefaultAzureCredential()
const client = new CosmosClient({
endpoint: process.env.COSMOSDB_ENDPOINT,
aadCredentials: credential
})
const database = client.database('async-api')
const container = database.container('statuses')
const orderId = process.env.ORDER_ID
const orderItem = await container.item(orderId, orderId).read()
const orderResource = orderItem.resource
if (orderResource === undefined) {
console.error('Order not found')
process.exit(1)
}
// 模拟处理时间
const orderProcessingTime = Math.floor(Math.random() * 30000)
console.log(`Processing order ${orderId} for ${orderProcessingTime}ms`)
await new Promise(resolve => setTimeout(resolve, orderProcessingTime))
// 更新Cosmos DB中的订单状态
orderResource.status = 'completed'
orderResource.order.completedAt = new Date().toISOString()
await orderItem.item.replace(orderResource)
console.log(`Order ${orderId} processed`)
HTTP客户端
为了调用API并等待结果,这里有一个简单的JavaScript函数,它的工作方式与fetch类似,但会等待Job完成。它还接受一个回调函数,该函数在每次轮询状态端点时被调用,以便您可以记录状态或更新UI。
async function fetchAndWait() {
const input = arguments[0]
let init = arguments[1]
let onStatusPoll = arguments[2]
// 如果arguments[1]是一个函数
if (typeof init === 'function') {
init = undefined
onStatusPoll = arguments[1]
}
onStatusPoll = onStatusPoll || (async () => {})
// 发起初始请求
const response = await fetch(input, init)
if (response.status !== 202) {
throw new Error(`Something went wrong\nResponse: ${await response.text()}\n`)
}
const responseOrigin = new URL(response.url).origin
let statusLocation = response.headers.get('Location')
// 如果Location头不是绝对URL,构造它
statusLocation = new URL(statusLocation, responseOrigin).href
// 轮询状态端点,直到它被重定向到最终结果
while (true) {
const response = await fetch(statusLocation, {
redirect: 'follow'
})
if (response.status !== 200 && !response.redirected) {
const data = await response.json()
throw new Error(`Something went wrong\nResponse: ${JSON.stringify(data, null, 2)}\n`)
}
// 已重定向,返回最终结果并停止轮询
if (response.redirected) {
const data = await response.json()
return data
}
// Retry-After头指示客户端应何时再次轮询
const retryAfter = parseInt(response.headers.get('Retry-After')) || 10
// 调用onStatusPoll回调,以便我们可以记录状态或更新UI
await onStatusPoll({
response,
retryAfter,
})
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000))
}
}
要使用该函数,我们像调用fetch一样调用它,并传递一个额外的参数,该参数是每次轮询状态端点时调用的回调函数。
const order = await fetchAndWait('/orders', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
"customer": "Contoso",
"items": [
{
"name": "Apple",
"quantity": 5
},
{
"name": "Banana",
"quantity": 3
},
],
})
}, async ({ response, retryAfter }) => {
const { status } = await response.json()
const requestUrl = response.url
messagesDiv.innerHTML += `Order status: ${status}; retrying in ${retryAfter} seconds (${requestUrl})\n`
})
// 显示最终结果
document.querySelector('#order').innerHTML = JSON.stringify(order, null, 2)
如果我们这在浏览器中运行,我们可以打开开发者工具并看到所有HTTP请求。 
在门户中,我们还可以看到Job执行历史。 
结论
通过异步请求-回复模式,我们可以构建能够处理长时间运行操作的健壮和可扩展的HTTP API。通过使用Azure容器应用Job,我们可以将处理卸载到Job执行,而不消耗API应用的资源。这种强大的方法允许API快速响应并并发处理许多请求。