In this post, we will write a very basic example which showcases content moderation for web applications using:
- Next.js
- AWS Lambda
- Serverless Kafka (Upstash)
- Serverless Redis (Upstash)
- Sightengine content moderation API
You can check the Github repo for the full example.
Scenario
Our application is a simple web page where users can submit comments. We want to filter inappropriate content before displaying it publicly. Here a screenshot from the demo.
When a user submits a comment, first we send it to a Kafka topic. The produced message triggers an AWS Lambda function. The AWS Lambda function decides whether the comment is inappropriate using the Sightengine API. If the content is clean, it is sent to the Redis list comments
, otherwise it is sent to rejected-comments
.
In the above algorithm, the user does not immediately see the comment they just posted. A possible improvement would be to only show the posted comment to its author, so the user experience will be smoother.
The Web Application
This is a basic Next.js application which sends the submitted content to an AWS Lambda function. It also loads the accepted and rejected comments from Redis.
import Head from "next/head";
import styles from "../styles/Home.module.css";
import { useEffect, useState } from "react";
export default function Home() {
const [data, setData] = useState([]);
const [data2, setData2] = useState([]);
const [loading, setLoading] = useState(false);
useEffect(() => {
loadComments();
}, []);
const sendComment = async (event) => {
event.preventDefault();
const res = await fetch("/api/submit", {
body: JSON.stringify({
comment: event.target.comment.value,
}),
headers: {
"Content-Type": "application/json",
},
method: "POST",
});
const result = await res.json();
console.log(result);
event.target.reset();
loadComments();
};
const loadComments = async () => {
setLoading(true);
setData([]);
setData2([]);
const res = await fetch("/api/load");
const result = await res.json();
setData(result.comments);
setData2(result.censored);
setLoading(false);
console.log(result);
};
return (
<div className={styles.container}>
<Head>
<title>Create Next App</title>
<meta name="description" content="Generated by create next app" />
<link rel="icon" href="/favicon.ico" />
</Head>
<main className={styles.main}>
<h4 className={styles.title}>
Content Moderation
<br />
with Upstash Kafka
</h4>
<p className={styles.description}>
Check the <a href={"#"}>blog post </a> for details.
</p>
<div className={styles.grid}>
<div className={styles.div1}>
<form onSubmit={sendComment}>
<textarea
name="comment"
type="text"
className={styles.textarea}
/>
<br />
<button type="submit" className={styles.submitbutton}>
Submit
</button>
</form>
</div>
<div className={styles.div2}>
<h3>Accepted Comments</h3>
{loading ? (
<div className={styles.loader}></div>
) : (
data.map((item) => <p className={styles.comment}>{item}</p>)
)}
</div>
<div className={styles.div2}>
<h3>Censored Comments</h3>
{loading ? (
<div className={styles.loader}></div>
) : (
data2.map((item) => <p className={styles.censored}>{item}</p>)
)}
</div>
</div>
</main>
<footer className={styles.footer}>
<a
href="https://upstash.com?utm_source=content-moderation-demo"
target="_blank"
rel="noopener noreferrer"
>
Powered by <span className={styles.logo}> Upstash</span>
</a>
</footer>
</div>
);
}
We need two API routes. submit.js to send the comment to Kafka and load.js to load the comments from Redis. Create a Redis database and Kafka cluster using the Upstash console, then replace the Redis and Kafka urls and credentials below.
Submit
import { Kafka } from "@upstash/kafka";
const kafka = new Kafka({
url: "REPLACE_HERE",
username: "REPLACE_HERE",
password: "REPLACE_HERE",
});
export default async function handler(req, res) {
const p = kafka.producer();
const l = await p.produce("comments", req.body.comment);
console.log(l);
res.status(200).json(l);
}
Load
import { Redis } from "@upstash/redis";
const redis = new Redis({
url: "REPLACE_HERE",
token: "REPLACE_HERE",
});
export default async function handler(req, res) {
const comments = await redis.lrange("comments", 0, 100);
const censored = await redis.lrange("rejected-comments", 0, 100);
const result = { comments: comments, censored: censored };
res.status(200).json(result);
}
The AWS Lambda Function
A Kafka topic will trigger this AWS Lambda function with each new comment. The function will call Sightengine API and depending on the response it will either push the comment to the comments
list or rejected-comments
list in the Redis. Here the required steps:
- Create a Serverless function (select Node.js runtime) using AWS SAM or Serverless Framework.
- Set your Kafka topic as trigger for the Lambda function as explained here.
- Create a Sightengine account and get API keys.
Here the function code:
const axios = require("axios");
const FormData = require("form-data");
const { Redis } = require("@upstash/redis");
const redis = new Redis({
url: "REPLACE_HERE",
token: "REPLACE_HERE",
});
module.exports.consume = async (event) => {
if (!event.records) {
return { response: "no kafka event" };
}
for (let messages of Object.values(event.records)) {
for (let msg of messages) {
let buff = Buffer.from(msg.value, "base64");
let text = buff.toString("ascii");
let res = await filterContent(text);
if (res === "none") {
await redis.lpush("comments", text);
} else {
await redis.lpush("rejected-comments", text);
}
}
}
return { response: "success" };
};
async function filterContent(text) {
let data = new FormData();
data.append("text", text);
data.append("lang", "en");
data.append("mode", "standard");
data.append("api_user", "REPLACE_HERE");
data.append("api_secret", "REPLACE_HERE");
let res = await axios({
url: "https://api.sightengine.com/1.0/text/check.json",
method: "post",
data: data,
headers: data.getHeaders(),
});
if (res.data.profanity.matches.length > 0) return "profanity";
else if (res.data.personal.matches.length > 0) return "personal";
else return "none";
}
Test & Run
If you deployed the AWS Lambda function and added Kafka as trigger, you are ready to test your system. Run npm run dev
in the folder of your web application. Refresh the page 5 seconds after posting a comment. You should see the comment in the web site either accepted or censored. If it does not work check:
- Check the logs of your AWS Lambda function. Ensure there is no error log.
- Check if the messages are in the Kafka topic. You may use the Upstash REST API.
- Check the lists in the Redis database.
Why did we use Kafka?
We could process the submitted comments directly in the backend. So why do we need Kafka? Kafka opens many possibilities for how you process the data for different resources. For example you can easily move the comments to your database or data warehouse using a Kafka connector. You can run some other processes where you make analysis on your usersโ comments. You can set up alerting systems processing the comments in real time. So Kafka becomes the data hub of your system where you can connect different systems feeding from the same data.
Is AWS Lambda a must?
Not at all. We preferred AWS Lambda to process messages to make the application serverless. You can use your servers/instances/containers to process the Kafka messages.
Closing Words
In this post, I tried to show how easy it is to implement a complex system end to end using serverless tools. We are planning to publish similar examples, follow us at Twitter and Discord.