Introduction
This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report.
Use Case
Our workflow will:
- Receive a request to process a dataset
- Download the dataset from a remote source
- Process the data in chunks using OpenAI
- Aggregate results
- Generate and send a final report
Code Example
import { serve } from "@upstash/workflow/nextjs"
import {
downloadData,
aggregateResults,
generateReport,
sendReport,
getDatasetUrl,
splitIntoChunks,
} from "./utils"
type OpenAiResponse = {
choices: {
message: {
role: string,
content: string
}
}[]
}
export const { POST } = serve<{ datasetId: string; userId: string }>(
async (context) => {
const request = context.requestPayload
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
const { body: dataset } = await context.call("download-dataset", {
url: datasetUrl,
method: "GET"
})
const chunkSize = 1000
const chunks = splitIntoChunks(dataset, chunkSize)
const processedChunks: string[] = []
for (let i = 0; i < chunks.length; i++) {
const { body: processedChunk } = await context.call<OpenAiResponse>(
`process-chunk-${i}`,
{
url: "https://api.openai.com/v1/chat/completions",
headers: { authorization: `Bearer ${process.env.OPENAI_API_KEY}` },
method: "POST",
body: {
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
}
)
processedChunks.push(processedChunk.choices[0].message.content)
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
}
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
}
)
Code Breakdown
1. Preparing our data
We start by retrieving the dataset URL and then downloading the dataset:
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
const { body: dataset } = await context.call("download-dataset", {
url: datasetUrl,
method: "GET"
})
Note that we use context.call
for the download, a way to make HTTP requests that run for much longer than your serverless execution limit would normally allow.
2. Processing our data
We split the dataset into chunks and process each one using OpenAI’s GPT-4 model:
for (let i = 0; i < chunks.length; i++) {
const { body: processedChunk } = await context.call<OpenAiResponse>(
`process-chunk-${i}`,
{
url: "https://api.openai.com/v1/chat/completions",
headers: { authorization: `Bearer ${process.env.OPENAI_API_KEY}` },
method: "POST",
body: {
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
}
)
3. Aggregating our data
After processing our data in smaller chunks to avoid any function timeouts, we aggregate results every 10 chunks:
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
4. Sending a report
Finally, we generate a report based on the aggregated results and send it to the user:
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
Key Features
-
Non-blocking HTTP Calls: We use context.call
for API requests so they don’t consume the endpoint’s execution time (great for optimizing serverless cost).
-
Long-running tasks: The dataset download can take up to 2 hours, though is realistically limited by function memory.