import socket import threading import numpy as np from scipy import signal class NeuracleDataClient: UPDATE_INTERVAL = 0.04 BYTES_PER_NUM = 4 def __init__(self, n_channel=9, samplerate=1000, host='localhost', port=8712, buffer_len=1.): self.n_channel = n_channel self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.chunk_size = int(self.UPDATE_INTERVAL * samplerate * self.BYTES_PER_NUM * n_channel) self.buffer = [] self.max_buffer_length = int(buffer_len * samplerate) self._host = host self._port = port # thread lock self.lock = threading.Lock() self.__datathread = threading.Thread(target=self.__recv_loop) self.samplerate = samplerate self.filter = OnlineHPFilter(1, samplerate) # start client self.__config() def __config(self): self.__sock.connect((self._host, self._port)) self.__run_forever() def is_active(self): return self.__sock.fileno() != -1 def close(self): self.__sock.close() self.__datathread.join() def __recv_loop(self): while self.is_active(): try: data = self.__sock.recv(self.chunk_size) except OSError: break if len(data) % 4 != 0: continue # unpack data data = self._unpack_data(data) # do highpass (exclude stim channel) data[:, :-1] = self.filter.filter_incoming(data[:, :-1]) # update buffer self.lock.acquire() self.buffer.extend(data.tolist()) # remove old data old_data_len = len(self.buffer) - self.max_buffer_length if old_data_len > 0: self.buffer = self.buffer[old_data_len:] self.lock.release() def _unpack_data(self, bytes_data): byte_data = bytearray(bytes_data) if len(byte_data) % 4 != 0: raise ValueError data = np.frombuffer(byte_data, dtype='