From 014ccf5765521374cf92bb07e7166218854b7d98 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 14 Oct 2017 13:18:28 -0400 Subject: [PATCH] wip --- streamz/dask.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/streamz/dask.py b/streamz/dask.py index 2049fd88..b83c395e 100644 --- a/streamz/dask.py +++ b/streamz/dask.py @@ -90,11 +90,52 @@ def update(self, x, who=None): return client.sync(self._update, x, who) +@DaskStream.register_api() +class buffer(DaskStream, core.buffer): + pass + + +@DaskStream.register_api() +class combine_latest(DaskStream, core.combine_latest): + pass + + +@DaskStream.register_api() +class delay(DaskStream, core.delay): + pass + + +@DaskStream.register_api() +class partition(DaskStream, core.partition): + pass + + +@DaskStream.register_api() +class rate_limit(DaskStream, core.rate_limit): + pass + + +@DaskStream.register_api() +class sliding_window(DaskStream, core.sliding_window): + pass + + +@DaskStream.register_api() +class timed_window(DaskStream, core.timed_window): + pass + + +@DaskStream.register_api() +class union(DaskStream, core.union): + pass + + @DaskStream.register_api() class zip(DaskStream, core.zip): pass +""" @DaskStream.register_api() class buffer(DaskStream): def __init__(self, child, n, loop=None): @@ -120,3 +161,4 @@ def _update(self, x, who=None): def update(self, x, who=None): return self.queue.put(x) +"""