Commit 3b014bdf authored by Dan Wilcox's avatar Dan Wilcox
Browse files

reimplemented using asyncio and websockets library

parent d5b0e4e3
0.2.0: 2021 Aug 02
* reimplemented using asyncio and websockets library
* added verbose printing commandline flag
* now exits cleanly via signal handler
0.1.0: 2021 Jul 01
initial version from IM team hackathon
......@@ -19,7 +19,7 @@ Dependencies
------------
* Python 3
* [simple-websocket-server](https://github.com/dpallot/simple-websocket-server)
* [websockets library](https://github.com/aaugustin/websockets)
Setup
-----
......@@ -37,10 +37,10 @@ python3 -m venv venv-baton
source venv-baton/bin/activate
```
Install the websocket server library via pip:
Install the websockets library via pip:
```shell
pip3 install git+https://github.com/dpallot/simple-websocket-server.git
pip3 install git+https://github.com/aaugustin/websockets.git
```
Running
......@@ -63,12 +63,12 @@ To configure the send/receive address and ports, see the commandline argument he
Defaults are:
* websocket: ws://localhost:8081
* udp recv: localhost 9999
* udp send: localhost 8888
* udp recv: 127.0.0.1 9999
* udp send: 127.0.0.1 8888
_Note: To connect external devices to the machine running baton, "localhost" cannot be used and only clients running on the same machine will be able to connect. Use the network IP address or local DNS hostname instead for baton and both local and remote clients, ie. 192.168.0.101, etc._
To stop baton, use CTRL+C to issue an interrupt signal. You need to do this a couple of times until it exits completely.
To stop baton, use CTRL+C to issue an interrupt signal.
When finished, deactivate the virtual environment with:
......
......@@ -7,17 +7,13 @@
# For information on usage and redistribution, and for a DISCLAIMER OF ALL
# WARRANTIES, see the file, "LICENSE.txt," in this distribution.
#
# This code has been developed at ZKM | Hertz-Lab as part of „The Intelligent
# This code has been developed at ZKM | Hertz-Lab as part of „The Intelligent
# Museum“ generously funded by the German Federal Cultural Foundation.
#
# TODO:
# * add signal handler to exit gracefully? probably needs nonblocking io...
# * verbose event messaging instead of commenting print() out
# * replace SimpleWebSocketServer with websockets lib?
# * replace UDP code with asyncio versions
from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
import socket, threading, argparse
import asyncio
import websockets
import signal
import argparse
##### parser
......@@ -30,89 +26,180 @@ parser.add_argument(
default=8081, type=int, help="websocket port ie. ws://localhost:####, default: 8081")
parser.add_argument(
"--recvaddr", action="store", dest="recvaddr",
default="localhost", help="udp receive addr, default: localhost")
default="127.0.0.1", help="udp receive addr, default: 127.0.0.1")
parser.add_argument(
"--recvport", action="store", dest="recvport",
default=9999, type=int, help="udp receive port, default: 9999")
parser.add_argument(
"--sendaddr", action="store", dest="sendaddr",
default='localhost', help="udp send port, default: localhost")
default="127.0.0.1", help="udp send port, default: 127.0.0.1")
parser.add_argument(
"--sendport", action="store", dest="sendport",
default=8888, type=int, help="udp send addr, default: 8888")
parser.add_argument("-v", "--verbose", action="store_true", dest="verbose",
help="enable verbose printing")
##### UDP
# simple UDP sender socket wrapper
class UDPSender(object):
# init with address, port, and optional brodcast (aka multicast)
def __init__(self, address, port, broadcast=False):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setblocking(0)
if broadcast:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.address = address
self.port = port
# send raw data
def send(self, data):
self._sock.sendto(data, (self.address, self.port))
# simple UDP receiver socket wrapper
class UDPReceiver(object):
def __init__(self, port, address="localhost"):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind((address, port))
self.running = True
# simple UDP sender asyncio protocol
class UDPSender:
def listenforever(self, wsserver):
print("udp: starting")
while self.running:
data, addr = self._sock.recvfrom(1024)
#print("udp: received ", data)
for client in wsserver.connections:
#print(wsserver.connections[client], "sending")
wsserver.connections[client].sendMessage(data)
@staticmethod
def create(loop, remote_addr, verbose):
task = asyncio.Task(loop.create_datagram_endpoint(
lambda: UDPSender(verbose=verbose),
remote_addr=remote_addr, allow_broadcast=True))
_, udpsender = loop.run_until_complete(task)
return udpsender
##### WebSocket
def __init__(self, verbose=True):
self.transport = None
self.verbose = verbose # verbosity
# websocket -> UDP relay implementation
class WSServer(WebSocket):
def close(self):
self.transport.close()
# setup UDP sender object
def __init__(self, server, sock, address):
WebSocket.__init__(self, server, sock, address)
self.udpsender = UDPSender(args.sendaddr, args.sendport)
# client connect callback
def handleConnected(self):
print("websocket: connected", self.address)
# simply relay raw messages to UDP client
def handleMessage(self):
#print("websocket: received ", self.data)
self.udpsender.send(self.data)
# client disconnect callback
def handleClose(self):
print("websocket: disconnected", self.address)
def send(self, data):
self.transport.sendto(data)
if self.verbose:
print(f"udp sender: sent {data}")
def connection_made(self, transport):
self.transport = transport
if self.verbose:
print(f"udp sender: connected")
def connection_lost(self, exc):
if self.verbose:
print("udp sender: disconnected, error:", exc)
# simple UDP receiver asyncio protocol
class UDPReceiver:
@staticmethod
def create(loop, local_addr, server, verbose):
task = asyncio.Task(loop.create_datagram_endpoint(
lambda: UDPReceiver(server=server, verbose=verbose),
local_addr=local_addr, allow_broadcast=True))
_, udpreceiver = loop.run_until_complete(task)
return udpreceiver
def __init__(self, loop=None, server=None, verbose=True):
self.transport = None
self.server = server # websocket server
self.verbose = verbose # verbosity
def close(self):
self.transport.close()
def connection_made(self, transport):
self.transport = transport
if self.verbose:
sockname = transport.get_extra_info("sockname")
print(f"udp receiver: connected {sockname}")
# relay raw datagrams to websocket clients
def datagram_received(self, data, addr):
if self.verbose:
print(f"udp receiver: received {data} from {addr}")
_ = asyncio.create_task(WebSocketRelayServer.send(data))
def connection_lost(self, exc):
if self.verbose:
print(f"udp receiver: disconnected, error: {exc}")
##### websocket
# lazy static class wrapper as websockets.serve only takes a function for ws_handler,
# quicker than figuring out how to implement a custom WebSocketServerProtocol
class WebSocketRelayServer:
clients = set() # connected clients
sender = None # udp sender
verbose = False # verbosity
@staticmethod
def create(loop, addr, sender, verbose=False):
host, port = addr
task = websockets.serve(ws_handler=WebSocketRelayServer.relay, host=host, port=port)
server = loop.run_until_complete(task)
WebSocketRelayServer.sender = sender
WebSocketRelayServer.verbose = verbose
if verbose:
print(f"websocket: connected {addr}")
return server
@staticmethod
async def register(websocket):
WebSocketRelayServer.clients.add(websocket)
if WebSocketRelayServer.verbose:
print("websocket: client connected", websocket)
@staticmethod
async def unregister(websocket):
WebSocketRelayServer.clients.remove(websocket)
if WebSocketRelayServer.verbose:
print("websocket: client disconnected", websocket)
# send data to all connected clients
@staticmethod
async def send(data):
if len(WebSocketRelayServer.clients) > 0:
tasks = []
for websocket in WebSocketRelayServer.clients:
tasks.append(asyncio.create_task(websocket.send(data)))
await asyncio.wait(tasks)
# relay raw websocket messages to UDP sender
@staticmethod
async def relay(websocket, path):
await WebSocketRelayServer.register(websocket)
try:
async for data in websocket:
if WebSocketRelayServer.verbose:
print(f"websocket: received {data}")
if WebSocketRelayServer.sender is not None:
WebSocketRelayServer.sender.send(data)
except Exception as exc:
# ignore "normal" disconnects, from websockets/exceptions.py:
# 1000 "OK"
# 1006 "connection closed abnormally [internal]"
if exc.code != 1000 and exc != 1006:
print(f"websocket: read error: {exc}")
finally:
await WebSocketRelayServer.unregister(websocket)
##### signal
# signal handler for nice exit
def sigint_handler():
print("\ncaught signal, exiting...")
asyncio.get_running_loop().stop()
##### GO
# parse
args = parser.parse_args()
print(f"send -> udp {args.recvaddr}:{args.recvport} -> ws://{args.wshost}:{args.wsport}")
print(f"recv <- udp {args.sendaddr}:{args.sendport} <- ws://{args.wshost}:{args.wsport}")
# websocket server
wsserver = SimpleWebSocketServer(args.wshost, args.wsport, WSServer)
# set up event loop
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, sigint_handler)
# UDP server
udpserver = UDPReceiver(args.recvport, args.recvaddr)
def listenUDP():
udpserver.listenforever(wsserver)
threading.Thread(target=listenUDP).start()
# udp sender
udpsender = UDPSender.create(loop, remote_addr=(args.sendaddr, args.sendport), verbose=args.verbose)
print("websocket: starting")
wsserver.serveforever()
# websocket server
relayserver = WebSocketRelayServer.create(loop, addr=(args.wshost, args.wsport), sender=udpsender, verbose=args.verbose)
# udp receiver
udpreceiver = UDPReceiver.create(loop, local_addr=(args.recvaddr, args.recvport), server=relayserver, verbose=args.verbose)
# run forever
try:
loop.run_forever()
finally:
udpsender.close()
udpreceiver.close()
relayserver.close()
#N canvas 417 265 806 318 12;
#N canvas 634 126 806 318 12;
#X obj 85 221 oscparse;
#X obj 85 253 print;
#X msg 108 154 listen 0;
......@@ -6,7 +6,7 @@
#X obj 482 222 netsend -u -b;
#X msg 536 175 disconnect;
#X obj 482 266 tgl 15 0 empty empty connected? 17 7 0 10 -262144 -1
-1 0 1;
-1 1 1;
#X obj 352 144 oscformat /foo;
#X msg 352 113 list bar 1 2 3;
#X msg 85 115 listen 8888 239.200.200.200;
......@@ -16,8 +16,10 @@
#X text 355 46 send;
#X msg 494 131 connect 239.200.200.200 6001;
#X text 707 122 multicast;
#X msg 482 84 connect localhost 9999;
#X text 21 9 send/recv OSC messages over UDP;
#X msg 346 193 1 2 3 4;
#X msg 322 84 list baz 4 5 6;
#X msg 482 84 connect localhost 9999;
#X connect 0 0 1 0;
#X connect 2 0 3 0;
#X connect 3 0 0 0;
......@@ -28,4 +30,6 @@
#X connect 9 0 3 0;
#X connect 10 0 3 0;
#X connect 14 0 4 0;
#X connect 16 0 4 0;
#X connect 17 0 4 0;
#X connect 18 0 7 0;
#X connect 19 0 4 0;
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment