1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import socket
- import threading
- import numpy as np
- class NeuracleDataClient:
- UPDATE_INTERVAL = 0.04
- BYTES_PER_NUM = 4
- BUFFER_LEN = 1 # in secondes
- def __init__(self, n_channel=9, samplerate=1000, host='localhost', port=8712):
- self.n_channel = n_channel
- self.chunk_size = int(self.UPDATE_INTERVAL * samplerate * self.BYTES_PER_NUM * n_channel)
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- self.buffer = []
- self.max_buffer_length = int(self.BUFFER_LEN / self.UPDATE_INTERVAL)
- self._host = host
- self._port = port
- # thread lock
- self.lock = threading.Lock()
- self.__datathread = threading.Thread(target=self.__recv_loop)
- self.samplerate = samplerate
- # start client
- self.__config()
- def __config(self):
- self.__sock.connect((self._host, self._port))
- self.__run_forever()
- def is_active(self):
- return self.__sock.fileno() != -1
- def close(self):
- self.__sock.close()
- self.__datathread.join()
- def __recv_loop(self):
- while self.__sock.fileno() != -1:
- try:
- data = self.__sock.recv(self.chunk_size)
- except OSError:
- break
- if len(data) % 4 != 0:
- continue
- self.lock.acquire()
- self.buffer.append(data)
- # remove old data
- if len(self.buffer) > self.max_buffer_length:
- del self.buffer[0]
- self.lock.release()
- def __run_forever(self):
- self.__datathread.start()
- def get_trial_data(self, clear=False):
- """
- called to copy trial data from buffer
- :args
- clear (bool):
- :return:
- samplerate: number, samplerate
- events: ndarray (n_events, 3), [onset, duration, event_label]
- data: ndarray with shape of (channels, timesteps)
- """
- self.lock.acquire()
- raw_data = self.buffer.copy()
- self.lock.release()
- total_data = b''.join(raw_data)
- byte_data = bytearray(total_data)
- if len(byte_data) % 4 != 0:
- raise ValueError
- data = np.frombuffer(byte_data, dtype='<f')
- data = np.reshape(data, (-1, self.n_channel))
- trigger_channel = data[:, -1]
- onset = np.flatnonzero(trigger_channel)
- event_label = trigger_channel[onset]
- events = np.stack((onset, np.zeros_like(onset), event_label), axis=1)
- if clear:
- self.buffer.clear()
- return self.samplerate, events, data[:, :-1].T
|