diff --git a/_snippets/workflow/workflow-dlq-message-type.mdx b/_snippets/workflow/workflow-dlq-message-type.mdx new file mode 100644 index 00000000..efad8dae --- /dev/null +++ b/_snippets/workflow/workflow-dlq-message-type.mdx @@ -0,0 +1,72 @@ + + Unique identifier for the DLQ message. + + + The URL the workflow endpoint + + + HTTP method used for the request. + + + Initial request headers for workflow run, including the configuration headers. + + + Request payload of the workflow run (if UTF-8). + + + Request body (base64-encoded, if not UTF-8). + + + Maximum number of retries for the workflow run. + + + Earliest time (Unix ms) the message could be processed. + + + Timestamp (Unix ms) when the message was created. + + + Failure callback URL (if set). + + + Failure callback request headers. + + + IP address of the publisher. + + + Workflow run ID (if applicable). + + + Timestamp (Unix ms) when the workflow run was created. + + + Workflow URL. + + + Flow control key (if set). + + + Rate limit (if set). + + + Parallelism (if set). + + + Period (if set). + + + HTTP response status code of the last failed delivery attempt. + + + HTTP response headers of the last failed delivery attempt. + + + HTTP response body (if UTF-8). + + + HTTP response body (base64-encoded, if not UTF-8). + + + Detailed information about the failure callback, including state, response body, response status and response headers. + \ No newline at end of file diff --git a/mint.json b/mint.json index a4c9015c..4940aa7a 100644 --- a/mint.json +++ b/mint.json @@ -1355,6 +1355,10 @@ { "group": "DLQ", "pages": [ + "workflow/rest/dlq/list", + "workflow/rest/dlq/get", + "workflow/rest/dlq/delete", + "workflow/rest/dlq/callback", "workflow/rest/dlq/resume", "workflow/rest/dlq/restart", "workflow/rest/dlq/bulk-restart", diff --git a/workflow/howto/failures.mdx b/workflow/howto/failures.mdx index 48d04ac1..11b27427 100644 --- a/workflow/howto/failures.mdx +++ b/workflow/howto/failures.mdx @@ -10,7 +10,8 @@ This guide shows you how to **gracefully handle failed workflow runs**. This inv - QStash calls your workflow URL, but the URL is not reachable - for example, because of a temporary outage of your deployment platform. - A single step takes longer than your platform's function execution limit. -QStash automatically retries a failed step **three times with exponential backoff** to allow temporary outages to resolve. +Workflow automatically retries a failed step based on your configuration (by default, it retries three times with exponential backoff). +This helps handle temporary outages or intermittent failures gracefully. @@ -65,11 +66,13 @@ async def example(context: AsyncWorkflowContext[str]) -> None: ... -Note: If you use a custom authorization method to secure your workflow endpoint, add authorization to the `failureFunction` too. Otherwise, anyone could invoke your failure function. Read more here: [securing your workflow endpoint](/workflow/howto/security). - ## Using a `failureUrl` -The `failureUrl` handles cases where the service hosting your workflow URL is unavailable. In this case, a workflow failure notification is sent to another reachable endpoint. +Instead of using the built-in failure function, you can define a separate failure callback URL. +Unlike the failure function, which only works when your application is running, the failure URL allows you to handle errors even if your application is completely down. +If the URL is a different service other than your application, it will be reachable in these cases. + +By pointing the failure URL to an external service (not hosted within your main application), you ensure that it remains accessible even when your primary app is unavailable. @@ -91,6 +94,75 @@ async def example(context: AsyncWorkflowContext[str]) -> None: ... +The callback body sent to you will be a JSON object with the following fields: + +```javascript JavaScript +{ + "status": 200, + "header": { "key": ["value"] }, // Response header + "body": "YmFzZTY0IGVuY29kZWQgcm9keQ==", // base64 encoded response body + "retried": 2, // How many times we retried to deliver the original message + "maxRetries": 3, // Number of retries before the message assumed to be failed to delivered. + "sourceMessageId": "msg_xxx", // The ID of the message that triggered the callback + "topicName": "myTopic", // The name of the URL Group (topic) if the request was part of a URL Group + "endpointName": "myEndpoint", // The endpoint name if the endpoint is given a name within a topic + "url": "http://myurl.com", // The destination url of the message that triggered the callback + "method": "GET", // The http method of the message that triggered the callback + "sourceHeader": { "key": "value" }, // The http header of the message that triggered the callback + "sourceBody": "YmFzZTY0kZWQgcm9keQ==", // The base64 encoded body of the message that triggered the callback + "notBefore": "1701198458025", // The unix timestamp of the message that triggered the callback is/will be delivered in milliseconds + "createdAt": "1701198447054", // The unix timestamp of the message that triggered the callback is created in milliseconds + "scheduleId": "scd_xxx", // The scheduleId of the message if the message is triggered by a schedule + "callerIP": "178.247.74.179" // The IP address where the message that triggered the callback is published from +} +``` + +In Next.js you could use the following code to handle the callback: + +```javascript JavaScript +// pages/api/callback.js + +import { verifySignature } from "@upstash/qstash/nextjs"; + +function handler(req, res) { + // responses from qstash are base64-encoded + const decoded = atob(req.body.body); + console.log(decoded); + + return res.status(200).end(); +} + +export default verifySignature(handler); + +export const config = { + api: { + bodyParser: false, + }, +}; +``` + +`verifySignature` allows to verify the signature of request, which is signed by Upstash using your signing keys. +If you don't want to verify the signature, you can remove `QSTASH_CURRENT_SIGNING_KEY` and `QSTASH_NEXT_SIGNING_KEY` environment variables and remove `verifySignature` function. + + + +## Manually Handling Failed Workflow Runs + +When a workflow run fails and is moved to the Dead Letter Queue (DLQ), you have several options to handle it manually via the REST API: + +### [Resume](/workflow/rest/dlq/resume) +- **What it does:** Continues a failed workflow run from exactly where it failed, preserving all successful step results. +- **When to use:** Use this if you want to retry only the failed/pending steps without re-executing the entire workflow. + +### [Restart](/workflow/rest/dlq/restart) +- **What it does:** Starts the failed workflow run over from the beginning, discarding all previous step results. +- **When to use:** Use this if you want a clean execution, or if the failure may have been caused by a corrupted state that requires a fresh start. + +### [Callback](/workflow/rest/dlq/callback) +- **What it does:** Reruns the failure callback for a workflow run, in case the original failure callback was not delivered or failed. +- **When to use:** Use this to ensure your system is notified of workflow failures, even if the original callback attempt did not succeed. + + ## Debugging failed runs In your DLQ, filter messages via the `Workflow URL` or `Workflow Run ID` to search for a particular failure. We include all request and response headers and bodies to simplify debugging failed runs. diff --git a/workflow/rest/dlq/callback.mdx b/workflow/rest/dlq/callback.mdx new file mode 100644 index 00000000..8aec9b75 --- /dev/null +++ b/workflow/rest/dlq/callback.mdx @@ -0,0 +1,45 @@ +--- +title: "Rerun Failure Callback for Workflow Run" +description: "Rerun the failure callback for a failed workflow run in the DLQ" +api: "POST https://qstash.upstash.io/v2/workflows/dlq/callback/{dlqId}" +authMethod: "bearer" +--- + +If the failure callback for a workflow run has failed, you can use this endpoint to manually trigger the failure callback again. +This is useful for ensuring that your system is notified of workflow failures even if the original callback attempt did not succeed. + +The state of the failure callback for each workflow run is included in the DLQ message response as failureCallbackInfo.state. +You can filter for all workflow runs with a failed failure callback by using the failureCallbackState filter when listing workflow runs in the DLQ with the `/v2/workflows/dlq` endpoint. + +## Request + + + The DLQ id of the failed workflow run for which you want to rerun the failure callback. You can find this id when listing all workflow runs in the DLQ with the [/v2/workflows/dlq](/workflow/rest/dlq/list) endpoint. + + +## Response + + + The ID of the workflow run for which the failure callback was rerun. + + + Unix timestamp when the workflow run was created. + + + + +```sh +curl -X POST "https://qstash.upstash.io/v2/workflows/dlq/callback/my-dlq-id" \ + -H "Authorization: Bearer " +``` + + + + +```json 200 OK +{ + "workflowRunId": "wfr_abcde", + "workflowCreatedAt": 1680000000000 +} +``` + diff --git a/workflow/rest/dlq/delete.mdx b/workflow/rest/dlq/delete.mdx new file mode 100644 index 00000000..f8ed3906 --- /dev/null +++ b/workflow/rest/dlq/delete.mdx @@ -0,0 +1,31 @@ +--- +title: "Delete a failed workflow run from the DLQ" +description: "Manually remove a failed workflow run from the DLQ" +api: "DELETE https://qstash.upstash.io/v2/workflows/dlq/{dlqId}" +authMethod: "bearer" +--- + +Delete a failed workflow run from the Dead Letter Queue (DLQ). + +When a workflow run fails, it is moved to the DLQ. You can manually remove a failed workflow run from the DLQ using this endpoint. This is useful for cleaning up failed runs that you no longer wish to retry or analyze. + +## Request + + + The DLQ id of the failed workflow run you want to remove. You will see this id when + listing all workflow runs in the DLQ with the [/v2/dlq](/workflow/rest/dlq/list) endpoint. + + +## Response + +The endpoint doesn't return a response body. A status code of 200 means the workflow run was removed from the DLQ. +If the workflow run is not found in the DLQ (either it has already been removed by you, or automatically), the endpoint returns a 404 status code. + + + +```sh +curl -X DELETE https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \ + -H "Authorization: Bearer " +``` + + diff --git a/workflow/rest/dlq/get.mdx b/workflow/rest/dlq/get.mdx new file mode 100644 index 00000000..fba85e9f --- /dev/null +++ b/workflow/rest/dlq/get.mdx @@ -0,0 +1,71 @@ +--- +title: "Get a failed workflow run from the DLQ" +description: "Get a single failed workflow run from the DLQ" +api: "GET https://qstash.upstash.io/v2/workflows/dlq/{dlqId}" +authMethod: "bearer" +--- + +Get a single failed workflow run from the Dead Letter Queue (DLQ). + +## Request + + + The DLQ id of the failed workflow run you want to retrieve. You will see this id when + listing all workflow runs in the DLQ with the [/v2/workflows/dlq](/workflow/rest/dlq/list) endpoint. + + +## Response + +If the workflow run is not found in the DLQ (either it has already been removed by you, or automatically), the endpoint returns a 404 status code. + + + + + +```sh +curl -X GET https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \ + -H "Authorization: Bearer " +``` + + + + +```json 200 OK +{ + "messageId":"msg_26hZCxZCuWyyTWPmSVBrNC1RADwpgWxPcak2rQD51EMjFMuzcW7qYXpPiDyw8Gd", + "url":"https://my.app/workflow", + "method":"POST", + "header":{ + "Content-Type":[ + "application/json" + ] + }, + "maxRetries":10, + "notBefore":1752829294505, + "createdAt":1752829294505, + "failureCallback":"https://my.app/workflow", + "callerIP":"88.240.188.2", + "workflowRunId":"wfr_5XAx4IJergqkGK1v23VzR", + "workflowCreatedAt":1752829293531, + "workflowUrl":"https://my.app/workflow", + "responseStatus":489, + "responseHeader":{ + "Content-Type":[ + "text/plain;charset=UTF-8" + ] + }, + "responseBody":"{\"error\":\"WorkflowNonRetryableError\",\"message\":\"this workflow has stopped\"}", + "failureCallbackInfo":{ + "state":"CALLBACK_SUCCESS", + "responseStatus":200, + "responseBody":"{\"workflowRunId\":\"wfr_Q_khHG-a414M-xKRh2kNI\"}", + "responseHeaders":{ + "Content-Type":[ + "text/plain;charset=UTF-8" + ] + } + }, + "dlqId":"1752829295505-0" +} +``` + diff --git a/workflow/rest/dlq/list.mdx b/workflow/rest/dlq/list.mdx new file mode 100644 index 00000000..f07337f3 --- /dev/null +++ b/workflow/rest/dlq/list.mdx @@ -0,0 +1,106 @@ +--- +title: "List workflow runs in the DLQ" +description: "List and paginate through all failed workflow runs currently inside the DLQ" +api: "GET https://qstash.upstash.io/v2/workflows/dlq" +authMethod: "bearer" +--- + +List all failed workflow runs currently inside the Dead Letter Queue. + +## Request + + + By providing a cursor you can paginate through all of the workflow runs in the DLQ + + + Filter DLQ workflow runs by workflow run id. + + + Filter DLQ workflow runs by workflow url. + + + Filter DLQ workflow runs by starting date, in milliseconds (Unix timestamp). This is inclusive. + + + Filter DLQ workflow runs by ending date, in milliseconds (Unix timestamp). This is inclusive. + + + Filter DLQ workflow runs by HTTP response status code. + + + Filter DLQ workflow runs by IP address of the publisher. + + + Filter DLQ workflow runs by the state of failure callback (failure function or failure URL) + + + The number of workflow runs to return. Default and maximum is 100. + + +## Response + + + A cursor which you can use in subsequent requests to paginate through all + workflow runs. If no cursor is returned, you have reached the end of the workflow runs. + + + + + + + + + + +```sh +curl https://qstash.upstash.io/v2/workflows/dlq \ + -H "Authorization: Bearer " +``` + + + + +```json 200 OK +{ + "cursor":"1752570296426-0", + "messages":[ + { + "messageId":"msg_26hZCxZCuWyyTWPmSVBrNC1RADwpgWxPcak2rQD51EMjFMuzcW7qYXpPiDyw8Gd", + "url":"https://my.app/workflow", + "method":"POST", + "header":{ + "Content-Type":[ + "application/json" + ] + }, + "maxRetries":10, + "notBefore":1752829294505, + "createdAt":1752829294505, + "failureCallback":"https://my.app/workflow", + "callerIP":"88.240.188.2", + "workflowRunId":"wfr_5XAx4IJergqkGK1v23VzR", + "workflowCreatedAt":1752829293531, + "workflowUrl":"https://my.app/workflow", + "responseStatus":489, + "responseHeader":{ + "Content-Type":[ + "text/plain;charset=UTF-8" + ] + }, + "responseBody":"{\"error\":\"WorkflowNonRetryableError\",\"message\":\"this workflow has stopped\"}", + "failureCallbackInfo":{ + "state":"CALLBACK_SUCCESS", + "responseStatus":200, + "responseBody":"{\"workflowRunId\":\"wfr_Q_khHG-a414M-xKRh2kNI\"}", + "responseHeaders":{ + "Content-Type":[ + "text/plain;charset=UTF-8" + ] + } + }, + "dlqId":"1752829295505-0" + } + ] +} +``` +