-
Notifications
You must be signed in to change notification settings - Fork 150
Add filter class to dask and do the tests for it #283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #283 +/- ##
=========================================
- Coverage 94.69% 93.79% -0.9%
=========================================
Files 13 13
Lines 1620 1644 +24
=========================================
+ Hits 1534 1542 +8
- Misses 86 102 +16
Continue to review full report at Codecov.
|
streamz/tests/test_dask.py
Outdated
@@ -131,6 +131,67 @@ def test_buffer(c, s, a, b): | |||
assert source.loop == c.loop | |||
|
|||
|
|||
@pytest.mark.slow | |||
def test_filter(backend): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to remove the backend
arg here, and in the scatter
statement. Streamz only has a dask backend at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I remove the backend for all the four tests I have added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already changed the four tests but the build is still failed
@@ -140,6 +157,24 @@ def update(self, x, who=None): | |||
return self._emit(result) | |||
|
|||
|
|||
@DaskStream.register_api() | |||
class filter(DaskStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you also need the modifications to the gather and other nodes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made changes to gather already. I also compared other nodes. There is a slightly difference between the dask.starmap and parallel.starmap. Do I need to change that one?
Hi, @martindurant @mrocklin, this is Zifan. I'm working on this branch with @CJ-Wright . We added class filter() to dask and modified class gather() according to parallel.py (including the associated tests). However, we ran into an issue which causes the test test_integration_from_stream fail in the test_dataframe.py. The test passes on the Stream but fails on Daskstream. After troubleshooting for a while, we guess there is something wrong with the process of serializing the code to different dask clusters, but we are not sure what the specific issue is. Can we pick your brains about how we can resolve it? Thanks! |
Sorry for not following up here - are you still interested in this, @xuzifan08 ? |
Added filter class to dask.py and four tests associated with it.