From 1945461be3d9ee4f2a66f62b102850ab27066080 Mon Sep 17 00:00:00 2001 From: Ryan Munro Date: Mon, 27 Feb 2017 16:47:23 -0800 Subject: [PATCH] Work on v1.0.0 release --- README.md | 194 +++++++++++++++++----------------- channel.js | 32 ++++++ dedent.js | 24 +++++ index.js | 238 ++++++++++++++++-------------------------- json.js | 6 ++ node_python_bridge.py | 17 ++- package.json | 3 +- test.js | 4 +- 8 files changed, 271 insertions(+), 247 deletions(-) create mode 100644 channel.js create mode 100644 dedent.js create mode 100644 json.js diff --git a/README.md b/README.md index 566bab8..1a003c4 100644 --- a/README.md +++ b/README.md @@ -10,87 +10,94 @@ npm install python-bridge 'use strict'; let assert = require('assert'); +let Promise = require('bluebird'); let pythonBridge = require('python-bridge'); -let python = pythonBridge(); - -python.ex`import math`; -python`math.sqrt(9)`.then(x => assert.equal(x, 3)); +let pythonResource = pythonBridge(); // start Python interpreter -let list = [3, 4, 2, 1]; -python`sorted(${list})`.then(x => assert.deepEqual(x, list.sort())); +Promise.using(pythonResource, function *(python) { // resource management, closes interpreter when done + python`import math`; + let sqrt = yield python`math.sqrt(9)`; // commands queue up, so no need to wait on `import math` + assert.equal(sqrt, 3); -python.end(); + let list = [3, 4, 2, 1]; + let sorted = yield python`sorted(${list})`; + assert.deepEqual(sorted, list.sort()); +}); ``` # API -## var python = pythonBridge(options) - -Spawns a Python interpreter, exposing a bridge to the running processing. Configurable via `options`. +## let python = pythonBridge(options) -* `options.python` - Python interpreter, defaults to `python` +Spawns a Python interpreter, exposing a bridge to the running processing. -Also inherits the following from [`child_process.spawn([options])`](https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options). - -* `options.cwd` - String Current working directory of the child process -* `options.env` - Object Environment key-value pairs -* `options.stdio` - Array Child's stdio configuration. Defaults to `['pipe', process.stdout, process.stderr]` +* `options.python` - Python interpreter, defaults to `'python'` +* `options.cwd` - Working directory of the Python process, defaults to `process.cwd()` +* `options.env` - Environment variables, defaults to `process.env` +* `options.stdin` - Defaults to `'pipe'` +* `options.stdout` - Defaults to `process.stdout` +* `options.stderr` - Defaults to `process.stderr` * `options.uid` - Number Sets the user identity of the process. * `options.gid` - Number Sets the group identity of the process. ```javascript var python = pythonBridge({ - python: 'python3', - env: {PYTHONPATH: '/foo/bar'} + python: 'python3', + env: {PYTHONPATH: '/foo/bar'} }); ``` -## python`` `expression(args...)` ``.then(...) +## python`code`.then(...) -Evaluates Python code, returning the value back to Node. +Runs Python code in the interpreter, returning the value back to Node. ```javascript -// Interpolates arguments using JSON serialization. -python`sorted(${[6, 4, 1, 3]})`.then(x => assert.deepEqual(x, [1, 3, 4, 6])); +Promise.using(python, function *(python) { + // Interpolates arguments using JSON serialization. + let x = yield python`sorted(${[6, 4, 1, 3]})`; + assert.deepEqual(x, [1, 3, 4, 6]); + + // Passing key-value arguments + let obj = {hello: 'world', foo: 'bar'}; + let dict = yield python`dict(baz=123, **${obj})`; + assert.deepEqual(dict, {baz: 123, hello: 'world', foo: 'bar'}); + + // Define function in Python + python` + def hello(a, b): + return a + b + `; -// Passing key-value arguments -let obj = {hello: 'world', foo: 'bar'}; -python`dict(baz=123, **${obj})`.then(x => { - assert.deepEqual(x, {baz: 123, hello: 'world', foo: 'bar'}); + // Then call it + let a = 123, b = 321; + let hello = python`hello(${a}, ${b})`; + assert.equal(hello, a + b); }); ``` -## python.ex`` `statement` ``.then(...) - -Execute Python statements. - -```javascript -let a = 123, b = 321; -python.ex` - def hello(a, b): - return a + b -`; -python`hello(${a}, ${b})`.then(x => assert.equal(x, a + b)); -``` - -## python.lock(...).then(...) +## python.lock(python => ...).then(...) Locks access to the Python interpreter so code can be executed atomically. If possible, it's recommend to define a function in Python to handle atomicity. ```javascript -python.lock(python => { - python.ex`hello = 123`; - return python`hello + 321'`; -}).then(x => assert.equal(x, 444)); - -// Recommended to define function in Python -python.ex` +Promise.using(python, function *(python) { + let lock = yield python.lock(function *() { + python`hello = 123`; + let value = yield python`hello + 321'`; + return value; + }); + assert.equal(lock, 444); + + // If possible, it's better to define a Python function to do things atomically + python` def atomic(): - hello = 123 - return hello + 321 -`; -python`atomic()`.then(x => assert.equal(x, 444)); + hello = 123 + return hello + 321 + `; + let atomic = python`atomic()`; + assert.equal(atomic, 444); +}); ``` ## python.stdin, python.stdout, python.stderr @@ -98,56 +105,66 @@ python`atomic()`.then(x => assert.equal(x, 444)); Pipes going into the Python process, separate from execution & evaluation. This can be used to stream data between processes, without buffering. ```javascript +'use strict'; + let Promise = require('bluebird'); let fs = Promise.promisifyAll(require('fs')); -let fileWriter = fs.createWriteStream('output.txt'); - -python.stdout.pipe(fileWriter); +let python = pythonBridge({stdout: 'pipe'}); // listen on Python process's stdout -python.ex` +Promse.using(python, function *(python) { + let fileWriter = fs.createWriteStream('output.txt'); + python.stdout.pipe(fileWriter); + + yield python` import sys for line in sys.stdin: - sys.stdout.write(line) - sys.stdout.flush() -`.then(function () { - fileWriter.end(); - fs.readFileAsync('output.txt', {encoding: 'utf8'}).then(x => assert.equal(x, 'hello\nworld\n')); + sys.stdout.write(line) + sys.stdout.flush() + `; + fileWriter.end(); + let output = yield fs.readFileAsync('output.txt', {encoding: 'utf8'}); + assert.equal(output, 'hello\nworld\n')); }); // write to Python process's stdin -python.stdin.write('hello\n'); -setTimeout(() => { - python.stdin.write('world\n'); - python.stdin.end(); -}, 10); +Promse.using(python, function *(python) { + python.stdin.write('hello\n'); + yield Promise.delay(10); + python.stdin.write('world\n'); + python.stdin.end(); +}); ``` ## python.end() Stops accepting new Python commands, and waits for queue to finish then gracefully closes the Python process. -## python.disconnect() - -_Alias to [`python.end()`](#python-end)_ - ## python.kill([signal]) Send signal to Python process, same as [`child_process child.kill`](https://nodejs.org/api/child_process.html#child_process_event_exit). ```javascript let Promise = require('bluebird'); - -python.ex` - from time import sleep - sleep(9000) -`.timeout(100).then(x => { - assert.ok(false); -}).catch(Promise.TimeoutError, (exit_code) => { - console.error('Python process taking too long, restarted.'); - python.kill('SIGKILL'); - python = pythonBridge(); +let pythonResource = pythonBridge(); + +Promise.using(pythonResource, function *(python) { + try { + yield python` + from time import sleep + sleep(9000) + `.timeout(100); + assert.ok(false); // shouldn't ever hit this + } catch (e) { + if (e instanceof Promise.TimeoutError) { + console.error('Python process taking too long, restarted.'); + python.kill('SIGKILL'); + pythonResource = pythonBridge(); + } else { + throw e; + } + } }); ``` @@ -155,27 +172,26 @@ python.ex` We can use Bluebird's [`promise.catch(...)`](http://bluebirdjs.com/docs/api/catch.html) catch handler in combination with Python's typed Exceptions to make exception handling easy. - -## python.Exception +## pythonBridge.PythonException Catch any raised Python exception. ```javascript -python.ex` +python` hello = 123 print(hello + world) world = 321 -`.catch(python.Exception, () => console.log('Woops! `world` was used before it was defined.')); +`.catch(PythonException, () => console.log('Woops! `world` was used before it was defined.')); ``` -## python.isException(name) +## pythonBridge.isPythonException Catch a Python exception matching the passed name. ```javascript function pyDivide(numerator, denominator) { return python`${numerator} / ${denominator}` - .catch(python.isException('ZeroDivisionError'), () => Promise.resolve(Infinity)); + .catch(isPythonException('ZeroDivisionError'), () => Promise.resolve(Infinity)); } pyDivide(1, 0).then(x => { assert.equal(x, Infinity); @@ -183,14 +199,6 @@ pyDivide(1, 0).then(x => { }); ``` -## pythonBridge.PythonException - -_Alias to `python.Exception`, this is useful if you want to import the function to at the root of the module._ - -## pythonBridge.isPythonException - -_Alias to `python.isException`, this is useful if you want to import the function to at the root of the module._ - ---- # Features diff --git a/channel.js b/channel.js new file mode 100644 index 0000000..c18b325 --- /dev/null +++ b/channel.js @@ -0,0 +1,32 @@ +'use strict'; + +let Promise = require('bluebird'); + +module.exports = class Channel { + constructor() { + this._reader = []; + this._writer = []; + } + put(x) { + return new Promise(done => { + if (this._reader.length) { + let write = this._reader.shift(); + write(x); + done(); + } else { + this._writer.push([x, done]); + } + }); + } + get() { + return new Promise(write => { + if (this._writer.length) { + let item = this._writer.shift(), value = item[0], done = item[1]; + write(value); + done(); + } else { + this._reader.push(write); + } + }); + } +}; diff --git a/dedent.js b/dedent.js new file mode 100644 index 0000000..16a1ffe --- /dev/null +++ b/dedent.js @@ -0,0 +1,24 @@ +'use strict'; + +module.exports = function dedent(code) { + // dedent text + let lines = code.split('\n'); + let offset = null; + + // remove extra blank starting line + if (!lines[0].trim()) { + lines.shift(); + } + for (let line of lines) { + let trimmed = line.trimLeft(); + if (trimmed) { + offset = (line.length - trimmed.length) + 1; + break; + } + } + if (!offset) { + return code; + } + let match = new RegExp('^' + new Array(offset).join('\\s?')); + return lines.map(line => line.replace(match, '')).join('\n'); +}; diff --git a/index.js b/index.js index 33b141d..86ba84c 100644 --- a/index.js +++ b/index.js @@ -1,104 +1,89 @@ 'use strict'; let Promise = require('bluebird'); +let Channel = require('./channel'); +let dedent = require('./dedent'); +let json = require('./json'); let path = require('path'); let child_process = Promise.promisifyAll(require('child_process')); const PYTHON_BRIDGE_SCRIPT = path.join(__dirname, 'node_python_bridge.py'); -function pythonBridge(opts) { - // default options - let intepreter = opts && opts.python || 'python'; - let stdio = opts && opts.stdio || ['pipe', process.stdout, process.stderr]; - let options = { - cwd: opts && opts.cwd, - env: opts && opts.env, - uid: opts && opts.uid, - gid: opts && opts.gid, - stdio: stdio.concat(['ipc']) +function pythonBridge({ + python:intepreter='python', cwd, env, uid, gid, + stdin='pipe', stdout=process.stdout, stderr=process.stderr +}) { + return new Promise((resolve, reject) => { + let ps = child_process.spawn(intepreter, [PYTHON_BRIDGE_SCRIPT], { + cwd, env, uid, gid, stdio: [stdin, stdout, stderr, 'ipc'] + }); + + // test that it works + // @TODO + + ps.once('close', () => { + // @TODO handle closing + __proto__.connected = false; + }); + + // open up bridge + let lock = new Channel(); + let __proto__ = { + end: () => { + throw Error('@TODO NOT IMPLEMENTED'); + }, + kill: signal => { + __proto__.connected = false; + ps.kill(signal); + }, + pid: ps.pid, stdin: ps.stdin, stdout: ps.stdout, stderr: ps.stderr, + connected: true }; + lock.put(); + resolve(createPython(ps, __proto__, lock)); + }).disposer(python => { + python.end(); + }); +} - // create process bridge - let ps = child_process.spawn(intepreter, [PYTHON_BRIDGE_SCRIPT], options); - let queue = singleQueue(); - - function sendPythonCommand(type, enqueue, self) { - function wrapper() { - self = self || wrapper; - let code = json.apply(this, arguments); - let on_message, on_close; - - if (!(this && this.connected || self.connected)) { - return Promise.reject(new PythonBridgeNotConnected()); - } - - return enqueue(() => new Promise((resolve, reject) => { - ps.send({type: type, code: code}); - ps.once('message', onMessage); - ps.once('close', onClose); - - function onMessage(data) { - ps.removeListener('close', onClose); - if (data && data.type && data.type === 'success') { - resolve(data.value); - } else if (data && data.type && data.type === 'exception') { - reject(new PythonException(data.value)); - } else { - reject(data); - } - } - - function onClose(exit_code, message) { - ps.removeListener('message', onMessage); - if (!message) { - reject(new Error(`Python process closed with exit code ${exit_code}`)); - } else { - reject(new Error(`Python process closed with exit code ${exit_code} and message: ${message}`)); - } - } - })); +function createPython(ps, __proto__, lock) { + function python(c) { + let code = json.apply(this, arguments); + return lock.get().then(() => new Promise((resolve, reject) => { + ps.send({type: 'code', code: code}); + ps.once('message', onMessage); + ps.once('close', onClose); + + function onMessage(data) { + ps.removeListener('close', onClose); + if (data && data.type && data.type === 'success') { + resolve(data.value); + } else if (data && data.type && data.type === 'exception') { + reject(new PythonException(data.value)); + } else { + reject(data); } - return wrapper; - } + } - function setupLock(enqueue) { - return f => { - return enqueue(() => { - let lock_queue = singleQueue(); - let lock_python = sendPythonCommand('evaluate', lock_queue); - lock_python.ex = sendPythonCommand('execute', lock_queue, lock_python); - lock_python.lock = setupLock(lock_queue); - lock_python.connected = true; - lock_python.__proto__ = python; - - return f(lock_python); - }); - }; - } - - // API - let python = sendPythonCommand('evaluate', queue); - python.ex = sendPythonCommand('execute', queue, python); - python.lock = setupLock(queue); - python.pid = ps.pid; - python.connected = true; - python.Exception = PythonException; - python.isException = isPythonException; - python.disconnect = () => { - python.connected = false; - return queue(() => { - ps.disconnect(); - }); - }; - python.end = python.disconnect; - python.kill = signal => { - python.connected = false; - ps.kill(signal); - }; - python.stdin = ps.stdin; - python.stdout = ps.stdout; - python.stderr = ps.stderr; - return python; + function onClose(exit_code, message) { + ps.removeListener('message', onMessage); + if (!message) { + reject(new Error(`Python process closed with exit code ${exit_code}`)); + } else { + reject(new Error(`Python process closed with exit code ${exit_code} and message: ${message}`)); + } + } + }).finally(() => lock.put())); + } + python.lock = function (f) { + return lock.get().then(() => { + let nestedLock = new Channel(); + nestedLock.put(); + return Promise.try(f, createPython(ps, __proto__, nestedLock)).finally(() => lock.put()); + }); + }; + python.__proto__ == __proto__; + return python; } class PythonException extends Error { @@ -116,67 +101,26 @@ class PythonException extends Error { this.format = exc.format; } } - -class PythonBridgeNotConnected extends Error { - constructor() { - super('Python bridge is no longer connected.'); - } -} - function isPythonException(name) { - return exc => ( - exc instanceof PythonException && - exc.exception && - exc.exception.type.name === name - ); -} - -function singleQueue() { - let last = Promise.resolve(); - return function enqueue(f) { - let wait = last; - let done; - last = new Promise(resolve => { - done = resolve; - }); - return new Promise((resolve, reject) => { - wait.finally(() => { - Promise.try(f).then(resolve, reject); - }); - }).finally(() => done()); - }; -} - -function dedent(code) { - // dedent text - let lines = code.split('\n'); - let offset = null; - - // remove extra blank starting line - if (!lines[0].trim()) { - lines.shift(); - } - for (let line of lines) { - let trimmed = line.trimLeft(); - if (trimmed) { - offset = (line.length - trimmed.length) + 1; - break; - } - } - if (!offset) { - return code; - } - let match = new RegExp('^' + new Array(offset).join('\\s?')); - return lines.map(line => line.replace(match, '')).join('\n'); -} - -function json(text_nodes) { - let values = Array.prototype.slice.call(arguments, 1); - return dedent(text_nodes.reduce((cur, acc, i) => cur + JSON.stringify(values[i - 1]) + acc)); + return exc => exc instanceof PythonException && exc.exception && exc.exception.type.name === name; } pythonBridge.PythonException = PythonException; -pythonBridge.PythonBridgeNotConnected = PythonBridgeNotConnected; pythonBridge.isPythonException = isPythonException; -pythonBridge.json = json; module.exports = pythonBridge; + +// before 182 LOC + +/* +[ ] convert python stack trace to javascript stack trace +[ ] write tests for stack traces +[ ] provide better error reporting for common mistakes +[ ] use bluebird resources +[ ] use channels to implement locking +[ ] polymorphic expression / statement usage +[ ] test with gevent +[ ] test with asyncio +[ ] provide ASYNC usage!! +[ ] modernize readme to use async/await library +[ ] test randomly dying app +*/ diff --git a/json.js b/json.js new file mode 100644 index 0000000..56916fb --- /dev/null +++ b/json.js @@ -0,0 +1,6 @@ +'use strict'; + +module.exports = function json(text_nodes) { + let values = Array.prototype.slice.call(arguments, 1); + return dedent(text_nodes.reduce((cur, acc, i) => cur + JSON.stringify(values[i - 1]) + acc)); +}; diff --git a/node_python_bridge.py b/node_python_bridge.py index 684971c..69f0cd3 100644 --- a/node_python_bridge.py +++ b/node_python_bridge.py @@ -1,12 +1,12 @@ from __future__ import unicode_literals -import six import os import sys import json import traceback NODE_CHANNEL_FD = int(os.environ['NODE_CHANNEL_FD']) +UNICODE_TYPE = type(u'') def format_exception(t=None, e=None, tb=None): @@ -33,6 +33,16 @@ def format_exception(t=None, e=None, tb=None): writer = os.fdopen(NODE_CHANNEL_FD, 'w') reader = os.fdopen(NODE_CHANNEL_FD, 'r') + def write(data): + writer.write(json.dumps(data, separators=(',', ':')) + '\n') + writer.flush() + + # We're ready to accept commands! + write(dict( + type='ready', + version=tuple(sys.version_info) + )) + while True: try: # Read new command @@ -44,7 +54,7 @@ def format_exception(t=None, e=None, tb=None): # Assert data saneness if data['type'] not in ['execute', 'evaluate']: raise Exception('Python bridge call `type` must be `execute` or `evaluate`') - if not isinstance(data['code'], six.string_types): + if not isinstance(data['code'], UNICODE_TYPE): raise Exception('Python bridge call `code` must be a string.') # Run Python code @@ -57,8 +67,7 @@ def format_exception(t=None, e=None, tb=None): t, e, tb = sys.exc_info() response = dict(type='exception', value=format_exception(t, e, tb)) - writer.write(json.dumps(response, separators=(',', ':')) + '\n') - writer.flush() + write(response) # Closing is messy try: diff --git a/package.json b/package.json index e4f9cfd..36f943a 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,8 @@ { "name": "python-bridge", - "version": "0.1.2", + "version": "1.0.0", "description": "Node.js to Python bridge ✨🐍🚀✨", + "repository": "Submersible/node-python-bridge", "main": "index.js", "scripts": { "test": "tap test.js" diff --git a/test.js b/test.js index 9f0206b..917d86c 100644 --- a/test.js +++ b/test.js @@ -156,7 +156,7 @@ test('readme', t => { hello = 123 print(hello + world) world = 321 - `.catch(python.Exception, () => t.ok(true)); + `.catch(pythonBridge.PythonException, () => t.ok(true)); python.ex` hello = 123 @@ -166,7 +166,7 @@ test('readme', t => { function pyDivide(numerator, denominator) { return python`${numerator} / ${denominator}` - .catch(python.isException('ZeroDivisionError'), () => Promise.resolve(Infinity)); + .catch(pythonBridge.isPythonException('ZeroDivisionError'), () => Promise.resolve(Infinity)); } pyDivide(1, 0).then(x => { t.equal(x, Infinity);