I've created a workflow with 4 activity which runs one after another using temporalio, When my one workflow starts it is not running in background, Unable to trigger new workflow until the current one finishes. I'm calling this workflow from django.
import asyncio
import random
import concurrent
import multiprocessing
from datetime import timedelta
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import activity, workflow
from temporalio.worker import SharedStateManager, Worker
async def get_client():
client = await Client.connect("localhost:7233")
return client
@activity.defn
async def activity1(params) -> dict:
print("Inside activity 1")
count = 1
while True:
if count == 10000000000:
break
count = count + 1
return params
@activity.defn
async def activity2(params) -> dict:
print("Inside activity 2")
return params
@activity.defn
async def activity3(params) -> dict:
print("Inside activity 3")
return params
@activity.defn
async def activity4(params) -> dict:
print("Inside activity 4")
return params
@workflow.defn
class TestTemporalWorkflow:
@workflow.run
async def run(self, params) -> dict:
response_1 = await workflow.start_activity(
activity1,
params,
start_to_close_timeout=timedelta(seconds=100),
)
response_2 = await workflow.start_activity(
activity2,
response_1,
start_to_close_timeout=timedelta(seconds=100),
)
response_3 = await workflow.start_activity(
activity3,
response_2,
start_to_close_timeout=timedelta(seconds=100),
)
response_4 = await workflow.start_activity(
activity4,
response_3,
start_to_close_timeout=timedelta(seconds=100),
)
async def main(params):
task_queue = "demo-temporal-workflow-queue"
workflows = [TestTemporalWorkflow]
activities = [activity1, activity2, activity3, activity4]
# Start client
client = await get_client()
async with Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
activity_executor=concurrent.futures.Executor,
workflow_task_executor=concurrent.futures.ThreadPoolExecutor(100),
shared_state_manager=SharedStateManager.create_from_multiprocessing(
multiprocessing.Manager()),
):
workflow_id = f"{random.randint(555,99999)}"
params = {"test" : "hellow user"}
await client.execute_workflow(
TestTemporalWorkflow.run,
params,
id=workflow_id,
task_queue=task_queue
)
if __name__ == "__main__":
asyncio.run(main({"test" : "test"}))
"""
My workflow is not returning the response, it's going to queue or deadlock and i'm not able to trigger another workflow unit
current workflow is not finished.
"""