This module includes utilities for python which are used within the dareplane framework. It contains functionality which shared can be reused within multiple modules. This currently includes:
- A
DefaultServer
- which will be loaded an extended within each module to implement the dareplane API -
logging
- which contains the standard formatting and a SocketHandler which is modified to sendjson
representations of the logging records to the default logging server port (9020). This is used to enable cross process logging. - A
StreamWatcher
implementation - which is a utility class to query a single LSL stream into a ring buffer.
This default server is used by all Dareplane
python modules as a starting
point for their TCP
socket. The idea is to have a single source for common
functionality and patch everything that is model specific on top of this
Currently we are faced with two functional incarnations of servers
- Spawning functionality from the server in a separate thread, being linked via events to the main thread (usually the server).
- Spawning a subprocess for running functionality - Currently necessary for running
psychopy
as it cannot be run from outside the main thread.
The logging tools allow two main entry point, which are from dareplane_utils.logging.logger import get_logger
, which is used to get a logger with the default configuration and from dareplane_utils.logging.server import LogRecordSocketReceiver
which is used to spawn up a server for consolidating logs of different processes.
StreamWatcher are a convenient utility around LSL stream inlets. They are basically a ring buffer for reading data to a numpy array. StreamWatchers are:
- initialized with a target stream name and a buffer size in seconds specified by
buffer_size
- connected to the target LSL stream
- updated to fetch the latest data (usually done in a loop)
from dareplane_utils.stream_watcher.lsl_stream_watcher import StreamWatcher
STREAM_NAME = "my_stream"
BUFFER_SIZE_S = 5 # the required buffer size will be calculated from the LSL
# streams meta data
sw = StreamWatcher(
STREAM_NAME,
buffer_size_s=BUFFER_SIZE_S,
)
# Either use the self.name or a provided identifier dict to hook up to an LSL stream
sw.connect_to_stream()
sw.update()
Update will call the following method:
def update(self):
"""Look for new data and update the buffer"""
samples, times = self.inlet.pull_chunk()
self.add_samples(samples, times)
self.samples = samples
self.n_new += len(samples)
To get the data from the StreamWatcher you can either grab the full ring buffer from the instance attributes
sw.buffer # ring buffer for data
sw.buffer_t # ring buffer for time stamps
sw.curr_i # current position of the head in the ring buffer
or you usually want the more convenient way by using the unfold_buffer
method,
which returns a chronologically sorted array ([-1] is the most recent data
point and [0] is the oldest data point).
sw.unfold_buffer() # sorted data
sw.unfold_buffer_t() # sorted time stamps
## The above is using the following implementation
def unfold_buffer(self):
return np.vstack(
[self.buffer[self.curr_i :], self.buffer[: self.curr_i]]
)
- channel names are only initialized on connection