Skip to content

Commit 284fea0

Browse files
Merge pull request #506 from davidbrochart/async_client
Add async API
2 parents c1b85ea + e0c8db0 commit 284fea0

File tree

6 files changed

+460
-2
lines changed

6 files changed

+460
-2
lines changed

.travis.yml

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ python:
66
- "3.7"
77
- "3.6"
88
- "3.5"
9-
- "2.7"
109
install:
1110
- pip install --upgrade setuptools pip
1211
- pip install --upgrade --upgrade-strategy eager --pre -e .[test] pytest-cov codecov 'coverage<5'

jupyter_client/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
from .client import KernelClient
77
from .manager import KernelManager, run_kernel
88
from .blocking import BlockingKernelClient
9+
from .asynchronous import AsyncKernelClient
910
from .multikernelmanager import MultiKernelManager
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import AsyncKernelClient
+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Async channels"""
2+
3+
# Copyright (c) Jupyter Development Team.
4+
# Distributed under the terms of the Modified BSD License.
5+
6+
from queue import Queue, Empty
7+
8+
9+
class ZMQSocketChannel(object):
10+
"""A ZMQ socket in an async API"""
11+
session = None
12+
socket = None
13+
stream = None
14+
_exiting = False
15+
proxy_methods = []
16+
17+
def __init__(self, socket, session, loop=None):
18+
"""Create a channel.
19+
20+
Parameters
21+
----------
22+
socket : :class:`zmq.asyncio.Socket`
23+
The ZMQ socket to use.
24+
session : :class:`session.Session`
25+
The session to use.
26+
loop
27+
Unused here, for other implementations
28+
"""
29+
super(ZMQSocketChannel, self).__init__()
30+
31+
self.socket = socket
32+
self.session = session
33+
34+
async def _recv(self, **kwargs):
35+
msg = await self.socket.recv_multipart(**kwargs)
36+
ident,smsg = self.session.feed_identities(msg)
37+
return self.session.deserialize(smsg)
38+
39+
async def get_msg(self, timeout=None):
40+
""" Gets a message if there is one that is ready. """
41+
if timeout is not None:
42+
timeout *= 1000 # seconds to ms
43+
ready = await self.socket.poll(timeout)
44+
45+
if ready:
46+
return await self._recv()
47+
else:
48+
raise Empty
49+
50+
async def get_msgs(self):
51+
""" Get all messages that are currently ready. """
52+
msgs = []
53+
while True:
54+
try:
55+
msgs.append(await self.get_msg())
56+
except Empty:
57+
break
58+
return msgs
59+
60+
async def msg_ready(self):
61+
""" Is there a message that has been received? """
62+
return bool(await self.socket.poll(timeout=0))
63+
64+
def close(self):
65+
if self.socket is not None:
66+
try:
67+
self.socket.close(linger=0)
68+
except Exception:
69+
pass
70+
self.socket = None
71+
stop = close
72+
73+
def is_alive(self):
74+
return (self.socket is not None)
75+
76+
def send(self, msg):
77+
"""Pass a message to the ZMQ socket to send
78+
"""
79+
self.session.send(self.socket, msg)
80+
81+
def start(self):
82+
pass

0 commit comments

Comments
 (0)