-
Notifications
You must be signed in to change notification settings - Fork 5.9k
[Bug]: A Workflow
runs properly when .run()
, but not when .run(stepwise=True)
#18334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
OMG, I found an issue with step-wise execution: #17898. Initially I thought that it's unrelated to mine. But OMFG, if you look at the code: #17898 (comment), it has similar event flow |
The issue you're experiencing with step-wise execution in the Llama Index project is indeed related to a known problem discussed in GitHub issue #17898. This issue highlights that step-wise execution can fail when steps involve asynchronous operations, which seems to be the case in your workflow. A related pull request (#17914) has been merged to address this problem, so it might be worth checking if the changes from this pull request resolve your issue. Additionally, the To continue talking to Dosu, mention @dosu. Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other |
dosu tried hard, but in my code there is no asynchronous operations in steps |
UPDATE: I could simplify the example reducing |
I've added some async def main():
w = TestWorkflow(verbose=True)
handler = w.run(stepwise=True)
while produced_events := await handler.run_step():
print("Produced events: " + str(produced_events))
for ev in produced_events:
handler.ctx.send_event(ev)
print("Awaiting results")
print("Buffer: " + str(handler.ctx._events_buffer))
result = await handler
print("Finished") Which produces these logs (with exception at the end):
This puzzles me: why when |
From what I can see, the return value of async def main():
if len(sys.argv) > 1 and sys.argv[1] == "step":
w: Workflow = TestWorkflow(verbose=True, timeout=1)
handler = w.run(stepwise=True)
while not handler.is_done():
produced_events = await handler.run_step()
for ev in produced_events or []:
handler.ctx.send_event(ev)
print("Awaiting results")
result = await handler
print("Finished with result", result)
print("Exception", handler.exception())
else:
w = TestWorkflow(verbose=True)
handler = w.run()
print("Awaiting results")
result = await handler
print("Finished with result", result)
try:
asyncio.run(main())
except Exception as e:
print("Exception", e) This gets closer, however something in here with the workflow coroutine causes the task to fail after the final print
|
Strangely, the invalid state exception disappears if there's an async pause inserted after sending events w: Workflow = TestWorkflow(verbose=True, timeout=1)
handler = w.run(stepwise=True)
while not handler.is_done():
produced_events = await handler.run_step()
for ev in produced_events or []:
handler.ctx.send_event(ev)
await asyncio.sleep(0) # <---------
print("Awaiting results")
result = await handler
print("Finished with result", result) |
Bug Description
When running the workflow (code listed in Steps to reproduce) in stepwise mode, one of the steps didn't work.
But when I run the workflows as always (with
.run()
) it works.I knew something was off! Hope this time I didn't make a mistake.
Version
0.12.25
Steps to Reproduce
Diagram:
When run in step-wise mode:
Produces these logs that are in the specialized section of the issue.
But when run simply with
.run()
, everything is perfect:Relevant Logs/Tracebacks
The text was updated successfully, but these errors were encountered: