Module library

Bootloader

Universal MicroPython Application Loader.

class umal.ApplicationInfo(name=None, version=None, settings=None, application=None, platform_info: Optional[umal.PlatformInfo] = None)

Collects various information. Most importantly transports the settings for further use in the Datalogger.

class umal.GenericChronometer

A millisecond chronometer implemented with vanilla MicroPython. https://micropython.readthedocs.io/en/latest/pyboard/tutorial/timer.html#making-a-microsecond-counter

class umal.McuFamily

Constants for designating different MCU families.

class umal.MicroPythonBootloader

Very basic tasks for running the system.

extend_syspath()

Extend Python module search path.

Dependency modules are shipped through the “dist-packages” folder. Please populate this folder appropriately as shown above before expecting anything to work.

The default PYTHONPATH as found on the different platforms is: - Vanilla MicroPython: [‘’, ‘/lib’] - Pycom MicroPython: [‘’, ‘/flash’, ‘/flash/lib’]

class umal.MicroPythonVendor

Constants for designating different vendors of the MicroPython platform.

class umal.PlatformInfo

Platform info object bundling the combination of MCU family and MicroPython vendor to assist in platform switching mechanics throughout the program.

MCU

alias of umal.McuFamily

MICROPYTHON

alias of umal.MicroPythonVendor

resolve_platform()

Resolve MCU and vendor of this platform.

Application modules

class terkin.datalogger.TerkinDatalogger(settings, platform_info: Optional[umal.PlatformInfo] = None)

Main class of project. Handles loop & sleep, registers sensors, reads their data and stores them. Shows up as ‘datalogger’ in the rest of the program.

duty_cycle()

Main duty cycle

duty_task()

Main duty cycle task.

get_sleep_time()

Calculate the next sleep intervall.

read_sensors() terkin.model.DataFrame

Read measurements from all sensor objects that have been registered in the sensor_manager. Reading is done with the read() function of each respective sensor object.

record_reading(sensor, reading, richdata)
Parameters
  • sensor

  • reading

  • richdata

register_sensor(sensor_info)

Register one sensor.

register_sensors()

Configure and register sensor objects. There are three types of sensors: system, environment & buses.

The sensors are registered by calling their respective classes from terkin/driver.

Definitions are in ‘settings.py’.

scale_wizard()

Invoke scale adjustment wizard.

Synopsis:

  • Invoke Terkin datalogger.

  • Interrupt by pressing CTRL+C.

  • Type datalogger.scale_wizard().

shutoff()

shut off the MCU

sleep()

Sleep or shutoff until the next measurement cycle.

start_buttons()

Configure ESP32 touchpads.

transmit_readings(dataframe: terkin.model.DataFrame)

Transmit data

Parameters

dataframe

class terkin.datalogger.TransientStorage

Store last sensor reading for serving through HTTP API.

class terkin.device.DeviceStatus

Object holding device status information.

class terkin.device.TerkinDevice(application_info: umal.ApplicationInfo)

Singleton object for enabling different device-related subsystems and providing lowlevel routines for sleep/resume functionality.

Parameters
  • color – rgb value as three hex values 0-255, e.g. 0x00FF00 for green

  • count – (Default value = 1)

configure_rgb_led()

https://docs.pycom.io/tutorials/all/rgbled.html

create_telemetry_adapter(telemetry_target)
Parameters

telemetry_target

hibernate(interval, lightsleep=False, deepsleep=False)
Parameters
  • interval

  • lightsleep – (Default value = False)

  • deepsleep – (Default value = False)

power_off_bluetooth()

We don’t use Bluetooth yet.

power_off_lte_modem()

We don’t use LTE yet.

Important

Once the LTE radio is initialized, it must be de-initialized before going to deepsleep in order to ensure minimum power consumption. This is required due to the LTE radio being powered independently and allowing use cases which require the system to be taken out from deepsleep by an event from the LTE network (data or SMS received for instance).

Note

When using the expansion board and the FiPy together, the RTS/CTS jumpers MUST be removed as those pins are being used by the LTE radio. Keeping those jumpers in place will lead to erratic operation and higher current consumption specially while in deepsleep.

https://forum.pycom.io/topic/3090/fipy-current-consumption-analysis/17

See also

-, -

print_bootscreen()

Print bootscreen.

This contains important details about your device and the operating system running on it.

run_gc()

Curate the garbage collector. https://docs.pycom.io/firmwareapi/micropython/gc.html

For a “quick fix”, issue the following periodically. https://community.hiveeyes.org/t/timing-things-on-micropython-for-esp32/2329/9

start_networking()

Start all configured networking devices.

start_rtc()

The RTC is used to keep track of the date and time. Syncs RTC with a NTP server.

Sensors

class terkin.sensor.core.I2CBus(settings)

Initialize the I2C hardware driver and represent as bus object.

power_off()

Turn off the I2C peripheral.

https://docs.pycom.io/firmwareapi/pycom/machine/i2c.html

power_on()

Turn on the I2C peripheral after power off.

class terkin.sensor.core.OneWireBus(settings)

Initialize the 1-Wire hardware driver and represent as bus object.

static device_address_ascii(address)
Parameters

address

class terkin.sensor.core.SensorManager(settings)

Manages all buses and sensors.

get_bus_by_name(name)
Parameters

name

get_bus_by_sensortype(type)

Return the bus for this sensor type.

get_sensor_by_id(id)

Return sensor with defined id.

get_sensor_by_name(name)
Parameters

name

get_sensor_by_type(type)

Return all sensors filtered by type.

get_sensors_by_family(family)

Return all sensors filtered by family.

power_off()

Send power-off to all buses and sensors

power_on()

Send power-on to all buses and sensors

power_toggle_buses(action)
Parameters

action

power_toggle_sensors(action)
Parameters

action

register_bus(bus)
Parameters

bus

register_sensor(sensor)
Parameters

sensor

setup_bus(bus_settings)
Parameters

bus_settings

setup_buses(buses_settings)

Register configured I2C, OneWire and SPI buses.

Parameters

buses_settings

Network

class terkin.network.wifi.SystemWiFiMetrics(settings, station)

Runtime information from the WiFi bearer.

exception terkin.network.wifi.WiFiException
class terkin.network.wifi.WiFiManager(manager, settings)

Manages all things relevant to the WiFi connection.

auth_mode_nvs_key(ssid)

Hack to get a short representation of a WiFi SSID in order to squeeze it into a NVRAM key with a maximum length of 15 characters.

Fixme: Review this.

Parameters

ssid

connect_station(network)
Parameters

network

connect_stations(network_names)
Parameters

network_names

forget_network(network_name)
Parameters

network_name

get_auth_mode(network_name)
Parameters

network_name

humanize_mac_addresses(mac)
Parameters

mac

is_connected()

Check if connected to WiFi access point and connection yielded a valid IP address.

power_off()

Power off all radio peripherals.

start_interface()

Genuine MicroPython: https://docs.micropython.org/en/latest/library/network.WLAN.html

Pycom MicroPython: https://docs.pycom.io/tutorials/all/wlan.html https://github.com/pycom/pydocs/blob/master/firmwareapi/pycom/network/wlan.md

wait_for_connection(timeout=15.0)

Wait for network to arrive.

Parameters

timeout – (Default value = 15.0)

class terkin.network.lora.LoRaDriverDragino(network_manager, settings)

LoRa driver for Dragino LoRa/GPS HAT on Raspberry Pi.

class terkin.network.lora.LoRaDriverPycom(network_manager, settings)

LoRa driver for Pycom MicroPython

create_socket()

Create socket for LoRa communication

receive()

Receive a LoRa packet.

send(payload)

Send a LoRa packet. :param payload:

start()

Start driver

wait_for_join()

wait for device activation to complete

Telemetry

class terkin.telemetry.core.BeepBobTopology(adapter=None)

Define how to communicate with BEEP for BOB.

https://en.wikipedia.org/wiki/Bebop

encode(dataframe: terkin.model.DataFrame)

Encode telemetry data matching the BEEP-BOB interface.

https://gist.github.com/vkuhlen/51f7968266659f37d076bd66d57cdbbd https://github.com/Hiverize/FiPy/blob/master/logger/beep.py

Example:

{
    't': 22.66734,
    'h': 52.41612,
    'p': 1002.334,
    'weight': 10.0,
    't_i_1': 23.1875,
    't_i_2': 23.125,
    't_i_3': 23.125,
    't_i_4': 23.1875
}
Parameters

data

class terkin.telemetry.core.CSVTelemetryAdapter(*args, **kwargs)

Telemetry node client: Network participant API.

This will make your node talk CSV.

transmit(data, **kwargs)
Parameters
  • data

  • kwargs

class terkin.telemetry.core.IdentityTopology

Apply no kind of transformation to telemetry payload and provide Base for derivatives of me.

encode(dataframe: terkin.model.DataFrame)
Parameters

dataframe – defined in terkin/model.py

move data_in to data_out (why?)

class terkin.telemetry.core.MQTTAdapter(client_id, server, port=0, username=None, password=None)

MQTT adapter wrapping the lowlevel MQTT driver. Handles a single connection to an MQTT broker.

TODO: Try to make this module reasonably compatible again

by becoming an adapter for different implementations. E.g., what about Paho?

connect()

Connect to MQTT broker

ensure_connection()

Conditionally connect to MQTT broker, if not connected already

load_driver()

Load MQTT driver module

publish(topic, payload, retain=False, qos=1)
Parameters
  • topic

  • payload

  • retain – (Default value = False)

  • qos – (Default value = 1)

class terkin.telemetry.core.MqttKitTopology

This defines how to communicate in WAN scenarios having a decent number of devices rolled out. While this would cover even earth-scale addressing scenarios, it will also give you peace of mind in smaller setups like multi-project or multi-tenant scenarios. Even for single users, the infinite number of available channels is very convenient for ad hoc operation scenarios.

class terkin.telemetry.core.TelemetryAdapter(device=None, target=None)

Telemetry node client: Network participant API Todo: Implement exponential backoff instead of MAX_FAILURES.

format_uri(**kwargs)
Parameters

kwargs

transform(dataframe: terkin.model.DataFrame)
Parameters

dataframe

transmit(dataframe: terkin.model.DataFrame)
Parameters

dataframe

class terkin.telemetry.core.TelemetryClient(interface, uri, format, content_encoding=None, uri_suffixes=None, settings=None, networking=None)

A flexible telemetry data client wrapping access to different transport adapters and serialization mechanisms.

get_handler(uri)
Parameters

uri

serialize(dataframe: terkin.model.DataFrame)
Parameters

data – defined in terkin/model.py

moves & reformats data from dataframe.data_out to dataframe.payload_out

transmit(dataframe: terkin.model.DataFrame, uri=None, serialize=True)
Parameters
  • dataframe

  • uri – (Default value = None)

  • serialize – (Default value = True)

class terkin.telemetry.core.TelemetryManager

Manage a number of telemetry adapters.

add_adapter(adapter)
Parameters

adapter

transmit(dataframe: terkin.model.DataFrame)
Parameters

data

class terkin.telemetry.core.TelemetryTopology

Define how to communicate using Telemetry.

class terkin.telemetry.core.TelemetryTransportMQTT(uri, format)

MQTT transport for Terkin Telemetry.

This is currently based on the “Pycom MicroPython MQTT module” just called mqtt.py. https://github.com/pycom/pycom-libraries/blob/master/lib/mqtt/mqtt.py

Originally, this was based on the “umqtt.robust” library:

micropython -m upip install micropython-umqtt.robust micropython-umqtt.simple
send(dataframe: terkin.model.DataFrame)
Parameters

dataframe – defined in terkin/model.py

sends data in dataframe.payload_out per MQTT to a server

terkin.telemetry.formatter.to_cayenne_lpp_hiveeyes(dataframe: terkin.model.DataFrame)

Serialize dataframe to binary CayenneLPP format.

Parameters

dataframe

terkin.telemetry.formatter.to_cayenne_lpp_ratrack(dataframe: terkin.model.DataFrame)

Serialize dataframe to binary CayenneLPP format.

Parameters

dataframe

terkin.telemetry.formatter.to_csv(dataframe: terkin.model.DataFrame)

Return a simple CSV string with headers