【微服务】如何用Azure容器应用Job处理异步HTTP API请求

2023-03-14T21:47:01+08:00 | 6分钟阅读 | 更新于 2023-03-14T21:47:01+08:00

Macro Zhao

【微服务】如何用Azure容器应用Job处理异步HTTP API请求

推荐超级课程:

@TOC

在构建HTTP API时,同步执行长时间运行的任务可能会很有诱惑力。这种方法可能导致响应缓慢、超时和资源耗尽。如果请求超时或连接断开,客户端将不知道操作是否完成。对于CPU密集型任务,这种方法还可能使服务器陷入困境,使其无法响应其他请求。 在本文中,我们将探讨如何使用Azure容器应用构建异步HTTP API。我们将创建一个简单的API,实现异步请求-回复模式:API托管在容器应用中,而异步处理则在Job中完成。这种方法为长时间运行的任务提供了更强大和可扩展的解决方案。

Azure容器应用中的长时间运行API请求


Azure容器应用是一个无服务器容器平台。它非常适合托管各种工作负载,包括HTTP API。 与其他无服务器和PaaS平台一样,Azure容器应用旨在处理短期请求——其入口目前具有4分钟的最大超时时间。作为一个自动扩展平台,它旨在根据传入请求的数量动态扩展。在缩减规模时,会移除副本。如果处理请求的副本被移除,长时间运行的请求可能会突然终止。

Azure容器应用Job


Azure容器应用有两种资源:应用和Job。应用是长时间运行的服务,响应HTTP请求或事件。Job是运行到完成的任务,可以通过计划或事件触发。 Job还可以通过编程方式触发。这使得它们非常适合在HTTP API中实现异步处理。API可以启动Job执行来处理请求,并立即返回响应。Job可以需要多长时间来完成处理。客户端可以轮询应用中的状态端点,以检查Job是否完成并获取结果。

异步请求-回复模式


异步请求-回复是HTTP API处理长时间运行操作的一种常见模式。API不是等待操作完成,而是返回一个状态码,指示操作已经开始。然后,客户端可以轮询API,以检查操作是否完成。 以下是Azure容器应用中该模式的应用方式:

  1. 客户端向API(作为容器应用托管)发送请求以启动操作。
  2. API保存请求(我们的示例使用Azure Cosmos DB),启动Job来处理操作,并返回202 Accepted状态码以及指向状态端点的Location头。
  3. 客户端轮询状态端点。当操作正在进行时,状态端点返回200 OK状态码,以及指示客户端应何时再次轮询的Retry-After头。
  4. 当操作完成时,状态端点返回303 See Other状态码,以及指向结果的Location头。客户端自动跟随重定向以获取结果。

    交互图

异步HTTP API应用


您可以在本文上方链接中找到源代码。 该API是一个简单的Node.js应用,使用Fastify。它演示了如何构建一个接受订单并将订单处理卸载到Job的异步HTTP API。该应用有几个简单的端点。

POST /orders

此端点接受其正文中的订单。它将订单保存到Cosmos DB,状态为“待处理”,并启动Job执行以处理订单。

fastify.post('/orders', async (request, reply) => {
    const orderId = randomUUID()
    // 将订单保存到Cosmos DB
    await container.items.create({
        id: orderId,
        status: 'pending',
        order: request.body,
    })
    // 启动Job执行
    await startProcessorJobExecution(orderId)
    // 返回202 Accepted和Location头
    reply.code(202).header('Location', '/orders/status/' + orderId).send()
})

我们将在本文稍后介绍Job。在上面的代码片段中,startProcessorJobExecution是一个启动Job执行的功能。它使用Azure容器应用管理SDK来启动Job。

const credential = new DefaultAzureCredential()
const containerAppsClient = new ContainerAppsAPIClient(credential, subscriptionId)
// ...
async function startProcessorJobExecution(orderId) {
    // 获取现有Job的模板
    const { template: processorJobTemplate } = 
        await containerAppsClient.jobs.get(resourceGroupName, processorJobName)
    // 将订单ID添加到Job的环境变量中
    const environmentVariables = processorJobTemplate.containers[0].env
    environmentVariables.push({ name: 'ORDER_ID', value: orderId })
    const jobStartTemplate = { template: processorJobTemplate }
    // 使用修改后的模板启动Job执行
    const jobExecution = await containerAppsClient.jobs.beginStartAndWait(
        resourceGroupName, processorJobName, {
            template: processorJobTemplate,
        }
    )
}

Job将订单ID作为环境变量。为了设置环境变量,我们使用修改后的模板启动Job执行,该模板包含订单ID。 我们使用托管身份来同时认证Azure容器应用管理SDK和Cosmos DB SDK。

GET /orders/status/:orderId

上一个端点返回202 Accepted状态码和指向此状态端点的Location头。客户端可以轮询此端点以检查订单的状态。 此请求处理程序从Cosmos DB检索订单。如果订单仍处于待处理状态,它返回200 OK状态码和指示客户端应何时再次轮询的Retry-After头。如果订单已完成,它返回303 See Other状态码和指向结果的Location头。

fastify.get('/orders/status/:orderId', async (request, reply) => {
    const { orderId } = request.params
    // 从Cosmos DB获取订单
    const { resource: item } = await container.item(orderId, orderId).read()
    if (item === undefined) {
        reply.code(404).send()
        return
    }
    if (item.status === 'pending') {
        reply.code(200).headers({
            'Retry-After': 10,
        }).send({ status: item.status })
    } else {
        reply.code(303).header('Location', '/orders/' + orderId).send()
    }
})

GET /orders/:orderId

此端点返回订单处理的结果。状态端点在订单完成时重定向到此资源。它从Cosmos DB检索订单并返回。

fastify.get('/orders/:orderId', async (request, reply) => {
    const { orderId } = request.params
    // 从Cosmos DB获取订单
    const { resource: item } = await container.item(orderId, orderId).read()
    if (item === undefined || item.status === 'pending') {
        reply.code(404).send()
        return
    }
    if (item.status === 'completed') {
        reply.code(200).send({ id: item.id, status: item.status, order: item.order })
    } else if (item.status === 'failed') {
        reply.code(500).send({ id: item.id, status: item.status, error: item.error })
    }
})

订单处理器Job


订单处理器Job是另一个Node.js应用。由于这只是一个演示,它只是等待一段时间,更新Cosmos DB中的订单状态,然后退出。在现实世界的场景中,Job将处理订单,更新订单状态,并可能发送通知。 我们将其部署为Azure容器应用中的Job。上面的POST /orders端点启动Job执行。Job将订单ID作为环境变量,并使用它来更新Cosmos DB中的订单状态。 与API应用一样,Job使用托管身份来认证Azure Cosmos DB。

import { DefaultAzureCredential } from '@azure/identity'
import { CosmosClient } from '@azure/cosmos'
const credential = new DefaultAzureCredential()
const client = new CosmosClient({ 
    endpoint: process.env.COSMOSDB_ENDPOINT, 
    aadCredentials: credential
})
const database = client.database('async-api')
const container = database.container('statuses')
const orderId = process.env.ORDER_ID
const orderItem = await container.item(orderId, orderId).read()
const orderResource = orderItem.resource
if (orderResource === undefined) {
    console.error('Order not found')
    process.exit(1)
}
// 模拟处理时间
const orderProcessingTime = Math.floor(Math.random() * 30000)
console.log(`Processing order ${orderId} for ${orderProcessingTime}ms`)
await new Promise(resolve => setTimeout(resolve, orderProcessingTime))
// 更新Cosmos DB中的订单状态
orderResource.status = 'completed'
orderResource.order.completedAt = new Date().toISOString()
await orderItem.item.replace(orderResource)
console.log(`Order ${orderId} processed`)

HTTP客户端


为了调用API并等待结果,这里有一个简单的JavaScript函数,它的工作方式与fetch类似,但会等待Job完成。它还接受一个回调函数,该函数在每次轮询状态端点时被调用,以便您可以记录状态或更新UI。

async function fetchAndWait() {
    const input = arguments[0]
    let init = arguments[1]
    let onStatusPoll = arguments[2]
    // 如果arguments[1]是一个函数
    if (typeof init === 'function') {
        init = undefined
        onStatusPoll = arguments[1]
    }
    onStatusPoll = onStatusPoll || (async () => {})
    // 发起初始请求
    const response = await fetch(input, init)
    if (response.status !== 202) {
        throw new Error(`Something went wrong\nResponse: ${await response.text()}\n`)
    }
    const responseOrigin = new URL(response.url).origin
    let statusLocation = response.headers.get('Location')
    // 如果Location头不是绝对URL,构造它
    statusLocation = new URL(statusLocation, responseOrigin).href
    // 轮询状态端点,直到它被重定向到最终结果
    while (true) {
        const response = await fetch(statusLocation, {
            redirect: 'follow'
        })
        if (response.status !== 200 && !response.redirected) {
            const data = await response.json()
            throw new Error(`Something went wrong\nResponse: ${JSON.stringify(data, null, 2)}\n`)
        }
        // 已重定向,返回最终结果并停止轮询
        if (response.redirected) {
            const data = await response.json()
            return data
        }
        // Retry-After头指示客户端应何时再次轮询
        const retryAfter = parseInt(response.headers.get('Retry-After')) || 10
        // 调用onStatusPoll回调,以便我们可以记录状态或更新UI
        await onStatusPoll({
            response,
            retryAfter,
        })
        await new Promise(resolve => setTimeout(resolve, retryAfter * 1000))
    }
}

要使用该函数,我们像调用fetch一样调用它,并传递一个额外的参数,该参数是每次轮询状态端点时调用的回调函数。

const order = await fetchAndWait('/orders', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json'
    },
    body: JSON.stringify({
        "customer": "Contoso",
        "items": [
            {
                "name": "Apple",
                "quantity": 5
            },
            {
                "name": "Banana",
                "quantity": 3
            },
        ],
    })
}, async ({ response, retryAfter }) => {
    const { status } = await response.json()
    const requestUrl = response.url
    messagesDiv.innerHTML += `Order status: ${status}; retrying in ${retryAfter} seconds (${requestUrl})\n`
})
// 显示最终结果
document.querySelector('#order').innerHTML = JSON.stringify(order, null, 2)

如果我们这在浏览器中运行,我们可以打开开发者工具并看到所有HTTP请求。 ![](images/2878927998773323230.png =1005x)

在门户中,我们还可以看到Job执行历史。 ![](images/2022647255155498728.png =927x)

结论


通过异步请求-回复模式,我们可以构建能够处理长时间运行操作的健壮和可扩展的HTTP API。通过使用Azure容器应用Job,我们可以将处理卸载到Job执行,而不消耗API应用的资源。这种强大的方法允许API快速响应并并发处理许多请求。

© 2011 - 2025 Macro Zhao的分享站

关于我

如遇到加载502错误,请尝试刷新😄

Hi,欢迎访问 Macro Zhao 的博客。Macro Zhao(或 Macro)是我在互联网上经常使用的名字。

我是一个热衷于技术探索和分享的IT工程师,在这里我会记录分享一些关于技术、工作和生活上的事情。

我的CSDN博客:
https://macro-zhao.blog.csdn.net/

欢迎你通过评论或者邮件与我交流。
Mail Me

推荐好玩(You'll Like)
  • AI 动·画
    • 这是一款有趣·免费的能让您画的画中的角色动起来的AI工具。
    • 支持几十种动作生成。
我的项目(My Projects)
  • 爱学习网

  • 小乙日语App

    • 这是一个帮助日语学习者学习日语的App。
      (当然初衷也是为了自用😄)
    • 界面干净,简洁,漂亮!
    • 其中包含 N1 + N2 的全部单词和语法。
    • 不需注册,更不需要订阅!完全免费!
  • 小乙日文阅读器

    • 词汇不够?照样能读日语名著!
    • 越读积累越多,积跬步致千里!
    • 哪里不会点哪里!妈妈再也不担心我读不了原版读物了!
赞助我(Sponsor Me)

如果你喜欢我的作品或者发现它们对你有所帮助,可以考虑给我买一杯咖啡 ☕️。这将激励我在未来创作和分享更多的项目和技术。🦾

👉 请我喝一杯咖啡

If you like my works or find them helpful, please consider buying me a cup of coffee ☕️. It inspires me to create and share more projects in the future. 🦾

👉 Buy me a coffee