File size: 3,230 Bytes
f96995c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import socket
import numpy as np
import pickle
import threading
import time
from modules_planning.common.communication import *
np.set_printoptions(precision=3, suppress=True)
class udpSender:
def __init__(self, port: dict):
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def send(self, data: dict):
self.sock.sendto(pickle.dumps(data), ('127.0.0.1', self.port))
def close(self):
self.sock.close()
class udpReceiver:
def __init__(self, ports: dict, re_use_address = False):
self.ports = ports
self.re_use_address = re_use_address
self.results = {}
self.lock = threading.Lock()
self.thread = None
def receive(self, name, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.re_use_address:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', port))
# sock.settimeout(0.1) # Set timeout to 1 second
while self.alive:
try:
t0 = time.time()
try:
data, addr = sock.recvfrom(32768)
except socket.timeout:
continue
data = pickle.loads(data)
with self.lock:
self.results[name] = data
t1 = time.time()
# print(f"Receive {name} data from {addr} in {t1-t0:.6f} seconds")
except BaseException as e:
print(f"Error in receiving {name} data: {e}")
break
sock.close()
print(f"Receive {name} thread stopped")
def start(self):
self.alive = True
self.thread = []
for name, port in self.ports.items():
t = threading.Thread(target=self.receive, args=(name, port))
t.start()
self.thread.append(t)
def stop(self):
print("Stopping udpReceiver")
self.alive = False
for t in self.thread:
if t is not None and t.is_alive():
t.join()
print("udpReceiver stopped")
def get(self, name=None, pop=False):
with self.lock:
if name is None:
ret = self.results
if pop:
self.results = {}
return ret
else:
if name in self.results:
if pop:
ret = self.results[name]
del self.results[name]
return ret
else:
return None
if __name__ == '__main__':
udpReceiver = udpReceiver(ports=
{
'object_pose': OBJECT_POSE_PORT,
'xarm_state': XARM_STATE_PORT
})
udpReceiver.start()
try:
while True:
print(udpReceiver.get())
print()
time.sleep(0.01)
except KeyboardInterrupt:
try:
udpReceiver.lock.release()
except:
pass
udpReceiver.stop()
|