We are using Sentry to monitor our production bugs, and since we picked Temporal to run our background tasks (workflows and activities) we don't have Sentry logging anymore.
Is it possible to configure our worker to setup Sentry SDK and forward errors in workflows and activities to Sentry?
Answer
You can configure your worker with a Workflow interceptor that will be able to catch all exception raised during your activities and workflow execution.
First define the SentryInterceptor interceptor: Interceptor file
from dataclasses import asdict, is_dataclass
from typing import Any, Optional, Type, Union
from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
with workflow.unsafe.imports_passed_through():
from sentry_sdk import Scope, isolation_scope
def _set_common_workflow_tags(scope: Scope, info: Union[workflow.Info, activity.Info]):
scope.set_tag("temporal.workflow.type", info.workflow_type)
scope.set_tag("temporal.workflow.id", info.workflow_id)
class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with isolation_scope() as scope:
scope.set_tag("temporal.execution_type", "activity")
scope.set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__)
activity_info = activity.info()
_set_common_workflow_tags(scope, activity_info)
scope.set_tag("temporal.activity.id", activity_info.activity_id)
scope.set_tag("temporal.activity.type", activity_info.activity_type)
scope.set_tag("temporal.activity.task_queue", activity_info.task_queue)
scope.set_tag(
"temporal.workflow.namespace", activity_info.workflow_namespace
)
scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.activity.input", asdict(input.args[0]))
scope.set_context("temporal.activity.info", activity.info().__dict__)
scope.capture_exception()
raise e
class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with isolation_scope() as scope:
scope.set_tag("temporal.execution_type", "workflow")
scope.set_tag(
"module", input.run_fn.__module__ + "." + input.run_fn.__qualname__
)
workflow_info = workflow.info()
_set_common_workflow_tags(scope, workflow_info)
scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
scope.set_tag("temporal.workflow.namespace", workflow_info.namespace)
scope.set_tag("temporal.workflow.run_id", workflow_info.run_id)
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
scope.set_context("temporal.workflow.input", asdict(input.args[0]))
scope.set_context("temporal.workflow.info", workflow.info().__dict__)
if not workflow.unsafe.is_replaying():
with workflow.unsafe.sandbox_unrestricted():
scope.capture_exception()
raise e
class SentryInterceptor(Interceptor):
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry"""
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _SentryActivityInboundInterceptor(super().intercept_activity(next))
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return _SentryWorkflowInterceptor
Then plug the interceptor in your worker:
with workflow.unsafe.imports_passed_through():
import sentry_sdk
from .sentry.interceptor import SentryInterceptor
from .workflows import GreetingWorkflow
from .activities import compose_greeting
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Initialize the Sentry SDK
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
worker = Worker(
client,
task_queue="sentry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
This will intercept the exception and forward them to Sentry bypassing the workflow importer sandbox.