Scalar Magnetometer

FieldLine Industries’ scalar magnetometer (SM) products are accessed via the pyflind.smx00.SMx00 object which currently supports the SM200 and SM300 generations of devices.

class pyflind.smx00.SMx00(device_uri: str, max_hist: int = 1000, lm_state_hist: int = 120, multiproc: bool = False)

Bases: FliDevice

FieldLine Industries Scalar Magnetometer device object.

An FliDevice for all scalar magnetometer products. The generation of device is automatically detected and appropriate stream and register maps provided upon instantiation.

Parameters:
  • device_uri – device connection string (see Device Communication)

  • max_hist – max number of samples of history to keep per stream

  • lm_state_hist – max number of logic module state transitions to track

  • multiproc – use process instead of thread for device io/codec

Rb87_GYROMAG_RATIO = 6995830000.0

Rubidium 87 gyromagnetic ratio in Hz/T

LARMOR_FREQ_RATIO = 0.000931322574832319

Conversion factor for larmor_freq stream to Hz

get_ident() Mapping[str, str | int]

Get device identifying information.

See also

get_model()

Returns:

device identity key/value pairs

Return type:

dict

get_fw_version_raw_reg() int

Get the device firmware version in raw 32-bit format from registers.

Returns:

32-bit firmware version

Return type:

int

get_fw_version_raw_stream() int

Get the device firmware version in raw 32-bit format from stream.

Returns:

32-bit firmware version

Return type:

int

get_fw_version(version_int: None | int = None) str

Get or convert a firmware version int to string format.

Parameters:

version_int – convert raw version if supplied, read version if None (default)

Returns:

human readable firmware version

Return type:

str

get_serial_raw_reg(prefix: str) int

Get a raw integer serial number from registers.

Parameters:

prefix – register prefix to read

Returns:

sensor serial

Return type:

int

get_serial_raw_stream(prefix: str) int

Get a raw integer serial number from streams.

Parameters:

prefix – stream prefix to read

Returns:

sensor serial

Return type:

int

get_sensor_serial(serial_raw: None | int = None) str

Get or convert the sensor serial number to string format.

Parameters:

serial_raw – convert raw serial if supplied, read if None (default)

Returns:

base-36 encoded sensor serial number

Return type:

str

get_electronics_serial(serial_raw: None | int = None) int

Get or convert the electronics serial number to string format.

Parameters:

serial_raw – convert raw serial if supplied, read if None (default)

Returns:

base-36 encoded electronics serial number

Return type:

str

get_baudrate() int

Get internal device baudrate.

Returns:

baud rate (bits/s), None if N/A or unknown

Return type:

int

set_baudrate(baud: int) None | int

Set the device serial baudrate.

This function changes the device baudrate via the uart_rate register to the requested value using the existing serial interface and then attempts to change the transport baudrate. If the transport doesn’t support baudrate changes (e.g. TCP) it will be required to close the connection to the device and establish a new one at this requested baud before communication can continue. It is recommended to reset the streaming schedule prior to a baudrate change to avoid decoding errors around the transition.

Parameters:

baud – uart baudrate

Returns:

actual baudrate on success, None if unsupported

Return type:

None|int

get_logic_module_state() None | str

Get the current logic module state.

Returns:

None if logic_module_state stream is disabled, current logic_module_state if stream is enabled

Return type:

None|str

get_logic_module_state_history() Sequence[tuple[str, int]]

Get the logic module state history.

Any time the logic_module_state stream is enabled it is monitored for state changes and the sample index of state changes is logged in a queue. This function empties and returns the queue when called.

Returns:

list((state, n))

start_sensor(timeout: float = 0) tuple[bool, Mapping[str, float]]

Start the sensor.

By default the function returns immediately after requesting the sensor start. If a nonzero timeout is provided the logic_module_state stream will be monitored and the function will only return when the magnetic_lock state is entered or the timeout elapses. When waiting for startup the time spent in each startup state is also tracked and returned.

Parameters:

timeout – seconds to wait for sensor to lock, 0 for non-blocking

Returns:

True if timeout == 0 or if sensor entered magnetic_lock False otherwise dict of times spent in each state during startup

Return type:

bool, dict

stop_sensor() None

Stop the sensor.

set_led(color: SMx00LEDColor, blink: bool = False, brightness: int = 8, manual_ctrl: bool = True) None

Set the LED configuration

By default the device LED color and blinking is set automatically to indicate the active state of the sensor. In this automatic mode the brightness is still configurable, or complete manual control can be enabled.

Parameters:
  • colorSMx00LEDColor to set, ignored if manual_ctrl is False

  • blink – True to blink the LED, False for solid. Ignored if manual_ctrl is False

  • brightness – 0-255 brightness value

  • manual_ctrl – True to ignore state and set the color/blink, False to track state

close() None

Close the device.

See also

on_close()

configure_stream(sid: str | int, enable: bool, timeout: float = 10) None

Configure a data stream.

Enable or disable a device stream. By default the call is blocking and waits for the stream change to take effect before returning and throws an exception upon timeout. If non-blocking operation is desired provide a timeout of 0, in which case the stream change will not be in effect when the call returns but will take effect at some future point in time.

See also

Parameters:
  • sid – stream name or id

  • enable – True to enable, False to disable

  • timeout – seconds to wait for stream, 0 for no wait

get_active_streams() Sequence[str | int]

Get list of active streams.

Returns:

stream ids

Return type:

list

get_io_stats(timeout: float = 1) Mapping[str, int | float]

Get the device IO statistics.

Parameters:

timeout – timeout in seconds (default 1)

Returns:

RX/TX byte count, RX/TX rate in B/s

Return type:

dict

get_max_history() int

Get the max length of sample history.

Returns:

max number of samples in history buffers

Return type:

int

get_max_samplerate(n_streams: int = 0) float

Compute the maximum samplerate which can achieve the desired number of streams.

Parameters:

n_streams – number of streams required, 0 for current streams

Returns:

maximum samplerate

Return type:

float

get_max_streams(baud: None | int = None, fs: None | float = None, timeout: float = 1) int

Compute the max number of streams supported for a given baud and samplerate.

Parameters:
  • baud – serial communications bitrate, defaults to current

  • fs – stream samplerate, defaults to current

  • timeout – timeout in seconds (default 1)

Returns:

max number of streams supportable

Return type:

int

get_model() str

Get the device model string.

Current supported models:
  • ‘SM200’

  • ‘SM300’

See also

get_ident()

Returns:

model

Return type:

str

get_register(address: str | int, timeout: float = 5) None | int

Read a register value.

This function performs a blocking read with timeout when timeout is nonzero. If a register value callback has been assigned it will be called in the RX thread context when the value is received. If no timeout is requested the command is sent and None is immediately returned.

See also

Parameters:
  • address – register address

  • timeout – blocking timeout in seconds, 0 to perform non-blocking (default 5)

Returns:

int value if blocking read successful, None otherwise

get_register_map() Mapping[str | int, int | str]

Get the device register name to ID map.

Returns:

register name to ID mappings

Return type:

dict

get_samplerate() float

Get the streaming samplerate.

See also

set_samplerate()

Returns:

samplerate in Hz

Return type:

float

get_samples() Mapping[str | int, Sequence[float]]

Read all buffered stream samples.

Stream samples are automatically buffered to an internal history with a configurable maximum length. This function returns the complete history available at the time of call as a dict of numpy arrays of samples. Subsequent calls will return samples immediately following those returned by the current call, i.e. no samples are dropped so the results from multiple consecutive calls can be concatenated.

Returns:

key=stream id, value=np.array of samples

Return type:

dict

get_stream_block_size() int

Get the size of stream blocks.

Stream samples are internally buffered in blocks of this size. Each stream block is passed to the on_stream_block() callback and are queued in the stream history. When the history is fetched via get_samples() all blocks in the history are concatenated and returned.

Returns:

max number of samples in history buffers

Return type:

int

get_stream_map() Mapping[str | int, int | str]

Get the device stream name to ID map.

Returns:

stream name to ID mappings

Return type:

dict

inc_register(address: str | int, increment: int = 1, timeout: float = 1) None

Increment a register value.

This function performs a blocking read with timeout to get the register value then writes the incremented value back.

See also

Parameters:
  • address – register address

  • timeout – blocking timeout in seconds, 0 to perform non-blocking (default 1)

Returns:

int value if blocking read successful, None otherwise

measure_stream_avg(sid: str | int, time: float = 1.0) float

Measure a stream’s average value.

Enable the stream if needed, record it for the specified time and compute it’s average value. If the stream was enabled it is also disabled before returning.

Note

This function consumes samples from the internal history so subsequent calls to get_samples() will begin with samples acquired after those used to compute the average.

Parameters:
  • sid – stream name or id

  • time – length of time to record

Returns:

average value of stream

Return type:

float

on_close(callback: None | Callable[[Any], None] = None, arg: Any = None) tuple[None | Callable[[Any], None], Any]

Set the callback function for device close.

The callback will be invoked in IO thread context when the device IO channel is closed.

See also

close()

Parameters:

callback – function(arg) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

on_register_value(callback: None | Callable[[Any, str | int, int], None] = None, arg: Any = None) tuple[None | Callable[[Any, str | int, int], None], Any]

Set the callback function for reading register values.

The callback will be invoked in IO thread context for each register value received. After registering a callback via this function a read request can be sent with get_register.

See also

get_register()

Parameters:

callback – function(arg, address, value) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

on_stream_add(callback: None | Callable[[Any, str | int], None] = None, arg: Any = None) tuple[None | Callable[[Any, str | int], None], Any]

Set the callback function for stream channel addition.

The callback will be invoked in IO thread context each time a stream channel is added.

Parameters:

callback – function(arg, stream_id) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

on_stream_block(callback: None | Callable[[Any, Mapping[str | int, Sequence[float]]], None] = None, arg: Any = None) tuple[None | Callable[[Any, Mapping[str | int, Sequence[float]]], None], Any]

Set the callback function for receiving stream data blocks.

The callback will be invoked in IO thread context with a block of data containing up to the stream block size samples for each stream. The data block is a dict a with the following keys:

  • ‘rtt’: last measured round-trip-time in seconds

  • ‘rx_t’: system time at which the sample was received

  • ‘n’: running sample index (reset to 0 on sync)

  • …: one per active stream

Note

The internal sample history can be used simultaneously with this callback interface. See get_samples().

Parameters:

callback – function(arg, data) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

on_stream_del(callback: None | Callable[[Any, str | int], None] = None, arg: Any = None) tuple[None | Callable[[Any, str | int], None], Any]

Set the callback function for stream channel removal.

The callback will be invoked in IO thread context each time a stream channel is removed.

Parameters:

callback – function(arg, stream_id) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

on_stream_samplerate(callback: None | Callable[[Any, int], None] = None, arg: Any = None) tuple[None | Callable[[Any, int], None], Any]

Set the callback function for stream samplerate change.

The callback will be invoked in IO thread context each time the stream samplerate is changed.

See also

set_samplerate()

Parameters:

callback – function(arg, samplerate) or None to de-register

Returns:

previously registered callback or None, None

Return type:

(callback, arg)

reset_schedule() None

Reset the streaming schedule.

This function disables all active streams simultaneously with a single operation.

set_max_history(max_hist: int, preserve: bool = True) None

Set the max length of sample history.

Allocate new sample history queues and optionally preserve the existing sample history.

Parameters:
  • max_hist – new max number of samples in history, None for infinite

  • preserve – True to preserve the current history, False to drop

set_max_samplerate(n_streams: int = 0) float

Set the maximum samplerate which can achieve the desired number of streams.

Parameters:

n_streams – number of streams required, 0 for current streams

Returns:

samplerate chosen

Return type:

float

set_register(address: str | int, value: int) None

Write a register value.

See also

Parameters:
  • address – register name or address

  • value – register value

set_samplerate(fs: float) float

Set the streaming samplerate.

Not all possible samplerates are supported. The closest achievable rate to the request will be used and returned.

Parameters:

fs – requested samplerate in Hz

Returns:

achieved samplerate in Hz

Return type:

float

set_stream_block_size(block_size: int = 10) None

Set the size of stream blocks.

Set the size of stream sample blocks. If the size is different than the current block size all buffered samples are dropped and new buffers/queues allocated for the new size.

Parameters:

block_size – max number of samples to hold in a block

stream_sync(reset_pps: bool = False, timeout: float = 10) None

Reset the streaming sample counter.

Device streams don’t have a timestamp but a sample counter which increments each sample regardless of the current samplerate (available via the ‘n’ stream). This function synchronizes various internal operations within the device and resets the sample counter to 0.

A stream_sync typically must be performed at least once after device power on, but may be issued any time after that to start counting samples from 0 again.

Devices with a PPS will delay the sync to the next rising PPS edge such that sample index 0 begins at the rising edge. The internal PPS clock counter can optionally be reset at the same time.

Parameters:
  • reset_pps – True to reset the PPS clock counter on sync

  • timeout – seconds to wait for sync, 0 for no wait

write_queue_drain(timeout: float = 1) None

Blocks until the write data queue has been drained.

This can be used after set_register() or configure_stream() with non-blocking operation to ensure the request has been sent to the device.

Parameters:

timeout – time in seconds to wait for drain to complete

class pyflind.smx00.SMx00StateMap

Bases: BidirectionalStrIntMap

SMx00 state name to id mapper.

Can be used to interpret the logic_module_state stream value.

class pyflind.smx00.SMx00LEDColor(*values)

Bases: Enum

SMx00 LED colors

off = 0
red = 1
orange = 2
yellow = 3
lime = 4
cyan = 5
blue = 6
purple = 7
pink = 8
white = 9
class pyflind.smx00.SMx00Error

Bases: Exception

General purpose exception for SMx00 errors.