Skip to content

QSTH-558: Add Workflow DLQ documentation #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions _snippets/workflow/workflow-dlq-message-type.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<ResponseField name="messageId" type="string" required>
Unique identifier for the DLQ message.
</ResponseField>
<ResponseField name="url" type="string">
The URL the workflow endpoint
</ResponseField>
<ResponseField name="method" type="string">
HTTP method used for the request.
</ResponseField>
<ResponseField name="header" type="object">
Initial request headers for workflow run, including the configuration headers.
</ResponseField>
<ResponseField name="body" type="string">
Request payload of the workflow run (if UTF-8).
</ResponseField>
<ResponseField name="bodyBase64" type="string">
Request body (base64-encoded, if not UTF-8).
</ResponseField>
<ResponseField name="maxRetries" type="integer">
Maximum number of retries for the workflow run.
</ResponseField>
<ResponseField name="notBefore" type="integer">
Earliest time (Unix ms) the message could be processed.
</ResponseField>
<ResponseField name="createdAt" type="integer">
Timestamp (Unix ms) when the message was created.
</ResponseField>
<ResponseField name="failureCallback" type="string">
Failure callback URL (if set).
</ResponseField>
<ResponseField name="failureCallbackHeader" type="object">
Failure callback request headers.
</ResponseField>
<ResponseField name="callerIP" type="string">
IP address of the publisher.
</ResponseField>
<ResponseField name="workflowRunId" type="string">
Workflow run ID (if applicable).
</ResponseField>
<ResponseField name="workflowCreatedAt" type="integer">
Timestamp (Unix ms) when the workflow run was created.
</ResponseField>
<ResponseField name="workflowUrl" type="string">
Workflow URL.
</ResponseField>
<ResponseField name="flowControlKey" type="string">
Flow control key (if set).
</ResponseField>
<ResponseField name="rate" type="integer">
Rate limit (if set).
</ResponseField>
<ResponseField name="parallelism" type="integer">
Parallelism (if set).
</ResponseField>
<ResponseField name="period" type="integer">
Period (if set).
</ResponseField>
<ResponseField name="responseStatus" type="integer">
HTTP response status code of the last failed delivery attempt.
</ResponseField>
<ResponseField name="responseHeader" type="object">
HTTP response headers of the last failed delivery attempt.
</ResponseField>
<ResponseField name="responseBody" type="string">
HTTP response body (if UTF-8).
</ResponseField>
<ResponseField name="responseBodyBase64" type="string">
HTTP response body (base64-encoded, if not UTF-8).
</ResponseField>
<ResponseField name="failureCallbackInfo" type="object">
Detailed information about the failure callback, including state, response body, response status and response headers.
</ResponseField>
4 changes: 4 additions & 0 deletions mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,10 @@
{
"group": "DLQ",
"pages": [
"workflow/rest/dlq/list",
"workflow/rest/dlq/get",
"workflow/rest/dlq/delete",
"workflow/rest/dlq/callback",
"workflow/rest/dlq/resume",
"workflow/rest/dlq/restart",
"workflow/rest/dlq/bulk-restart",
Expand Down
80 changes: 76 additions & 4 deletions workflow/howto/failures.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ This guide shows you how to **gracefully handle failed workflow runs**. This inv
- QStash calls your workflow URL, but the URL is not reachable - for example, because of a temporary outage of your deployment platform.
- A single step takes longer than your platform's function execution limit.

QStash automatically retries a failed step **three times with exponential backoff** to allow temporary outages to resolve.
Workflow automatically retries a failed step based on your configuration (by default, it retries three times with exponential backoff).
This helps handle temporary outages or intermittent failures gracefully.

<Frame caption="A failed step is automatically retried three times">
<img src="/img/qstash-workflow/automatic_retry.png" />
Expand Down Expand Up @@ -65,11 +66,13 @@ async def example(context: AsyncWorkflowContext[str]) -> None: ...

</CodeGroup>

Note: If you use a custom authorization method to secure your workflow endpoint, add authorization to the `failureFunction` too. Otherwise, anyone could invoke your failure function. Read more here: [securing your workflow endpoint](/workflow/howto/security).

## Using a `failureUrl`

The `failureUrl` handles cases where the service hosting your workflow URL is unavailable. In this case, a workflow failure notification is sent to another reachable endpoint.
Instead of using the built-in failure function, you can define a separate failure callback URL.
Unlike the failure function, which only works when your application is running, the failure URL allows you to handle errors even if your application is completely down.
If the URL is a different service other than your application, it will be reachable in these cases.

By pointing the failure URL to an external service (not hosted within your main application), you ensure that it remains accessible even when your primary app is unavailable.

<CodeGroup>

Expand All @@ -91,6 +94,75 @@ async def example(context: AsyncWorkflowContext[str]) -> None: ...

</CodeGroup>

The callback body sent to you will be a JSON object with the following fields:

```javascript JavaScript
{
"status": 200,
"header": { "key": ["value"] }, // Response header
"body": "YmFzZTY0IGVuY29kZWQgcm9keQ==", // base64 encoded response body
"retried": 2, // How many times we retried to deliver the original message
"maxRetries": 3, // Number of retries before the message assumed to be failed to delivered.
"sourceMessageId": "msg_xxx", // The ID of the message that triggered the callback
"topicName": "myTopic", // The name of the URL Group (topic) if the request was part of a URL Group
"endpointName": "myEndpoint", // The endpoint name if the endpoint is given a name within a topic
"url": "http://myurl.com", // The destination url of the message that triggered the callback
"method": "GET", // The http method of the message that triggered the callback
"sourceHeader": { "key": "value" }, // The http header of the message that triggered the callback
"sourceBody": "YmFzZTY0kZWQgcm9keQ==", // The base64 encoded body of the message that triggered the callback
"notBefore": "1701198458025", // The unix timestamp of the message that triggered the callback is/will be delivered in milliseconds
"createdAt": "1701198447054", // The unix timestamp of the message that triggered the callback is created in milliseconds
"scheduleId": "scd_xxx", // The scheduleId of the message if the message is triggered by a schedule
"callerIP": "178.247.74.179" // The IP address where the message that triggered the callback is published from
}
```

In Next.js you could use the following code to handle the callback:

```javascript JavaScript
// pages/api/callback.js

import { verifySignature } from "@upstash/qstash/nextjs";

function handler(req, res) {
// responses from qstash are base64-encoded
const decoded = atob(req.body.body);
console.log(decoded);

return res.status(200).end();
}

export default verifySignature(handler);

export const config = {
api: {
bodyParser: false,
},
};
```

`verifySignature` allows to verify the signature of request, which is signed by Upstash using your signing keys.
If you don't want to verify the signature, you can remove `QSTASH_CURRENT_SIGNING_KEY` and `QSTASH_NEXT_SIGNING_KEY` environment variables and remove `verifySignature` function.



## Manually Handling Failed Workflow Runs

When a workflow run fails and is moved to the Dead Letter Queue (DLQ), you have several options to handle it manually via the REST API:

### [Resume](/workflow/rest/dlq/resume)
- **What it does:** Continues a failed workflow run from exactly where it failed, preserving all successful step results.
- **When to use:** Use this if you want to retry only the failed/pending steps without re-executing the entire workflow.

### [Restart](/workflow/rest/dlq/restart)
- **What it does:** Starts the failed workflow run over from the beginning, discarding all previous step results.
- **When to use:** Use this if you want a clean execution, or if the failure may have been caused by a corrupted state that requires a fresh start.

### [Callback](/workflow/rest/dlq/callback)
- **What it does:** Reruns the failure callback for a workflow run, in case the original failure callback was not delivered or failed.
- **When to use:** Use this to ensure your system is notified of workflow failures, even if the original callback attempt did not succeed.


## Debugging failed runs

In your DLQ, filter messages via the `Workflow URL` or `Workflow Run ID` to search for a particular failure. We include all request and response headers and bodies to simplify debugging failed runs.
Expand Down
45 changes: 45 additions & 0 deletions workflow/rest/dlq/callback.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
title: "Rerun Failure Callback for Workflow Run"
description: "Rerun the failure callback for a failed workflow run in the DLQ"
api: "POST https://qstash.upstash.io/v2/workflows/dlq/callback/{dlqId}"
authMethod: "bearer"
---

If the failure callback for a workflow run has failed, you can use this endpoint to manually trigger the failure callback again.
This is useful for ensuring that your system is notified of workflow failures even if the original callback attempt did not succeed.

The state of the failure callback for each workflow run is included in the DLQ message response as failureCallbackInfo.state.
You can filter for all workflow runs with a failed failure callback by using the failureCallbackState filter when listing workflow runs in the DLQ with the `/v2/workflows/dlq` endpoint.

## Request

<ParamField path="dlqId" type="string" required>
The DLQ id of the failed workflow run for which you want to rerun the failure callback. You can find this id when listing all workflow runs in the DLQ with the [/v2/workflows/dlq](/workflow/rest/dlq/list) endpoint.
</ParamField>

## Response

<ResponseField name="workflowRunId" type="string">
The ID of the workflow run for which the failure callback was rerun.
</ResponseField>
<ResponseField name="workflowCreatedAt" type="integer">
Unix timestamp when the workflow run was created.
</ResponseField>

<RequestExample>

```sh
curl -X POST "https://qstash.upstash.io/v2/workflows/dlq/callback/my-dlq-id" \
-H "Authorization: Bearer <token>"
```

</RequestExample>

<ResponseExample>
```json 200 OK
{
"workflowRunId": "wfr_abcde",
"workflowCreatedAt": 1680000000000
}
```
</ResponseExample>
31 changes: 31 additions & 0 deletions workflow/rest/dlq/delete.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
title: "Delete a failed workflow run from the DLQ"
description: "Manually remove a failed workflow run from the DLQ"
api: "DELETE https://qstash.upstash.io/v2/workflows/dlq/{dlqId}"
authMethod: "bearer"
---

Delete a failed workflow run from the Dead Letter Queue (DLQ).

When a workflow run fails, it is moved to the DLQ. You can manually remove a failed workflow run from the DLQ using this endpoint. This is useful for cleaning up failed runs that you no longer wish to retry or analyze.

## Request

<ParamField path="dlqId" type="string">
The DLQ id of the failed workflow run you want to remove. You will see this id when
listing all workflow runs in the DLQ with the [/v2/dlq](/workflow/rest/dlq/list) endpoint.
</ParamField>

## Response

The endpoint doesn't return a response body. A status code of 200 means the workflow run was removed from the DLQ.
If the workflow run is not found in the DLQ (either it has already been removed by you, or automatically), the endpoint returns a 404 status code.

<RequestExample>

```sh
curl -X DELETE https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \
-H "Authorization: Bearer <token>"
```

</RequestExample>
71 changes: 71 additions & 0 deletions workflow/rest/dlq/get.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
---
title: "Get a failed workflow run from the DLQ"
description: "Get a single failed workflow run from the DLQ"
api: "GET https://qstash.upstash.io/v2/workflows/dlq/{dlqId}"
authMethod: "bearer"
---

Get a single failed workflow run from the Dead Letter Queue (DLQ).

## Request

<ParamField path="dlqId" type="string">
The DLQ id of the failed workflow run you want to retrieve. You will see this id when
listing all workflow runs in the DLQ with the [/v2/workflows/dlq](/workflow/rest/dlq/list) endpoint.
</ParamField>

## Response

If the workflow run is not found in the DLQ (either it has already been removed by you, or automatically), the endpoint returns a 404 status code.

<Snippet file="workflow/workflow-dlq-message-type.mdx" />

<RequestExample>

```sh
curl -X GET https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \
-H "Authorization: Bearer <token>"
```

</RequestExample>

<ResponseExample>
```json 200 OK
{
"messageId":"msg_26hZCxZCuWyyTWPmSVBrNC1RADwpgWxPcak2rQD51EMjFMuzcW7qYXpPiDyw8Gd",
"url":"https://my.app/workflow",
"method":"POST",
"header":{
"Content-Type":[
"application/json"
]
},
"maxRetries":10,
"notBefore":1752829294505,
"createdAt":1752829294505,
"failureCallback":"https://my.app/workflow",
"callerIP":"88.240.188.2",
"workflowRunId":"wfr_5XAx4IJergqkGK1v23VzR",
"workflowCreatedAt":1752829293531,
"workflowUrl":"https://my.app/workflow",
"responseStatus":489,
"responseHeader":{
"Content-Type":[
"text/plain;charset=UTF-8"
]
},
"responseBody":"{\"error\":\"WorkflowNonRetryableError\",\"message\":\"this workflow has stopped\"}",
"failureCallbackInfo":{
"state":"CALLBACK_SUCCESS",
"responseStatus":200,
"responseBody":"{\"workflowRunId\":\"wfr_Q_khHG-a414M-xKRh2kNI\"}",
"responseHeaders":{
"Content-Type":[
"text/plain;charset=UTF-8"
]
}
},
"dlqId":"1752829295505-0"
}
```
</ResponseExample>
Loading