Skip to content

[Bug]: A Workflow runs properly when .run(), but not when .run(stepwise=True) #18334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
InAnYan opened this issue Apr 1, 2025 · 8 comments · May be fixed by #18470
Open

[Bug]: A Workflow runs properly when .run(), but not when .run(stepwise=True) #18334

InAnYan opened this issue Apr 1, 2025 · 8 comments · May be fixed by #18470
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@InAnYan
Copy link
Contributor

InAnYan commented Apr 1, 2025

Bug Description

When running the workflow (code listed in Steps to reproduce) in stepwise mode, one of the steps didn't work.

But when I run the workflows as always (with .run()) it works.

I knew something was off! Hope this time I didn't make a mistake.

Version

0.12.25

Steps to Reproduce

Diagram:

Image

import asyncio
from typing import Optional
from llama_index.core.workflow import Context, Event, StartEvent, StopEvent, Workflow, step


class EventA(Event):
    pass


class EventB(Event):
    pass


class TestWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> EventA:
        return EventA()

    @step
    async def make_b(self, ev: EventA) -> EventB:
        return EventB()

    @step
    async def finish(self, ctx: Context, evs: EventA | EventB) -> Optional[StopEvent]:
        got = ctx.collect_events(evs, [EventA, EventB])
        
        if not got:
            return None
        
        return StopEvent()


async def main():
    w = TestWorkflow(verbose=True)
    await w.run()


asyncio.run(main())

When run in step-wise mode:

async def main():
    w = TestWorkflow(verbose=True)
    handler = w.run(stepwise=True)

    while produced_events := await handler.run_step():
        for ev in produced_events:
            handler.ctx.send_event(ev)

    print("Awaiting results")

    result = await handler

    print("Finished")


asyncio.run(main())

Produces these logs that are in the specialized section of the issue.

But when run simply with .run(), everything is perfect:

Running step start
Step start produced event EventA
Running step finish
Step finish produced no event
Running step make_b
Step make_b produced event EventB
Running step finish
Step finish produced event StopEvent

Relevant Logs/Tracebacks

Running step start
Step start produced event EventA
Running step finish
Step finish produced no event
Running step make_b
Step make_b produced event EventB
Awaiting results
Exception in callback Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result()() at [REDACTED]/.venv/lib/python3.13/site-packages/llama_index/core/instrumentation/dispatcher.py:274
handle: <Handle Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result()() at [REDACTED]/.venv/lib/python3.13/site-packages/llama_index/core/instrumentation/dispatcher.py:274>
Traceback (most recent call last):
  File "/usr/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[REDACTED]/.venv/lib/python3.13/site-packages/llama_index/core/instrumentation/dispatcher.py", line 286, in handle_future_result
    raise exception
  File "[REDACTED]/.venv/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 401, in _run_workflow
    raise WorkflowTimeoutError(msg)
llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds
Traceback (most recent call last):
  File "[REDACTED]/test.py", line 56, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/usr/lib/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/usr/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/lib/python3.13/asyncio/base_events.py", line 725, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "[REDACTED]/test.py", line 43, in main
    result = await handler
             ^^^^^^^^^^^^^
  File "[REDACTED]/.venv/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 401, in _run_workflow                                                                                        
    raise WorkflowTimeoutError(msg)
llama_index.core.workflow.errors.WorkflowTimeoutError: Operation timed out after 10.0 seconds
@InAnYan InAnYan added bug Something isn't working triage Issue needs to be triaged/prioritized labels Apr 1, 2025
@InAnYan
Copy link
Contributor Author

InAnYan commented Apr 1, 2025

OMG, I found an issue with step-wise execution: #17898.

Initially I thought that it's unrelated to mine. But OMFG, if you look at the code: #17898 (comment), it has similar event flow

Copy link

dosubot bot commented Apr 1, 2025

The issue you're experiencing with step-wise execution in the Llama Index project is indeed related to a known problem discussed in GitHub issue #17898. This issue highlights that step-wise execution can fail when steps involve asynchronous operations, which seems to be the case in your workflow. A related pull request (#17914) has been merged to address this problem, so it might be worth checking if the changes from this pull request resolve your issue.

Additionally, the WorkflowTimeoutError you're encountering is likely due to a synchronization issue between WorkflowHandler.run_step() and Workflow._start()._task(), especially when a step function includes an await operation. A suggested solution involves using a while loop with an assignment expression to ensure proper synchronization and prevent deadlocks [1].

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@InAnYan
Copy link
Contributor Author

InAnYan commented Apr 1, 2025

dosu tried hard, but in my code there is no asynchronous operations in steps

@InAnYan
Copy link
Contributor Author

InAnYan commented Apr 1, 2025

UPDATE: I could simplify the example reducing EventC (which corresponds to AccEvent2 in the issue I referenced)

@InAnYan
Copy link
Contributor Author

InAnYan commented Apr 1, 2025

I've added some print to observe what's inside:

async def main():
    w = TestWorkflow(verbose=True)

    handler = w.run(stepwise=True)

    while produced_events := await handler.run_step():
        print("Produced events: " + str(produced_events))
        for ev in produced_events:
            handler.ctx.send_event(ev)

    print("Awaiting results")
    print("Buffer: " + str(handler.ctx._events_buffer))

    result = await handler

    print("Finished")

Which produces these logs (with exception at the end):

Running step start
Step start produced event EventA
Produced events: [EventA()]
Running step finish
Step finish produced no event
Running step make_b
Step make_b produced event EventB
Awaiting results
Buffer: defaultdict(<class 'list'>, {'__main__.EventA': [EventA()]})

This puzzles me: why when Step make_b produced event EventB control is not returned to the code inside the loop? And it exists from while loop. Where did EventB go?

@adrianlyjak
Copy link
Contributor

adrianlyjak commented Apr 6, 2025

@InAnYan

From what I can see, the return value of await handler.run_step() is Optional. A None returned from that function does not mean that the workflow is complete, it rather means that None was returned from a step (like you return in finish). In the examples, I see while not handler.is_done() as the recommended way of stepping through the workflow, although some of that example code seems outdated, as its not returning a list.

async def main():

    if len(sys.argv) > 1 and sys.argv[1] == "step":

        w: Workflow = TestWorkflow(verbose=True, timeout=1)
        handler = w.run(stepwise=True)

        while not handler.is_done():
            produced_events = await handler.run_step()
            for ev in produced_events or []:
                handler.ctx.send_event(ev)

        print("Awaiting results")
        result = await handler
        print("Finished with result", result)
        print("Exception", handler.exception())

    else:
        w = TestWorkflow(verbose=True)
        handler = w.run()

        print("Awaiting results")
        result = await handler

        print("Finished with result", result)


try:
    asyncio.run(main())
except Exception as e:
    print("Exception", e)

This gets closer, however something in here with the workflow coroutine causes the task to fail after the final print

Awaiting results
Finished with result None
Exception None
Task exception was never retrieved
future: <Task finished name='Task-16' coro=<Workflow.run.<locals>._run_workflow() done, defined at /.../llama_index/llama-index-core/llama_index/core/workflow/workflow.py:347> exception=InvalidStateError('invalid state')>
Traceback (most recent call last):
  File "/.../llama_index/llama-index-core/llama_index/core/workflow/workflow.py", line 407, in _run_workflow
    result.set_result(ctx._retval)
asyncio.exceptions.InvalidStateError: invalid state

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/.../llama_index/llama-index-core/llama_index/core/workflow/workflow.py", line 409, in _run_workflow
    result.set_exception(e)
asyncio.exceptions.InvalidStateError: invalid state

@adrianlyjak
Copy link
Contributor

adrianlyjak commented Apr 6, 2025

Strangely, the invalid state exception disappears if there's an async pause inserted after sending events

w: Workflow = TestWorkflow(verbose=True, timeout=1)
handler = w.run(stepwise=True)

while not handler.is_done():
    produced_events = await handler.run_step()
    for ev in produced_events or []:
        handler.ctx.send_event(ev)
        await asyncio.sleep(0) # <---------

print("Awaiting results")
result = await handler
print("Finished with result", result)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants