2. Example Applications
2.1. simple_writer.py
This application demonstrates the use of the Metric
module to publish ten random values to the specified DesignSpark Cloud database.
from DesignSpark.Cloud import Metrics
import random
import time
DSM_INSTANCE = ""
DSM_KEY = ""
DSM_URL = ""
m = Metrics.Metric(instance=DSM_INSTANCE, key=DSM_KEY, url=DSM_URL)
for x in range(10):
status = m.write({"name":"test_metric", "value":random.randint(1, 50), "label1":"this_label", "label2":"that_label"})
if status == True:
print("Successfully posted")
elif status is not True:
print("Unsuccessful post, result {}".format(status))
time.sleep(1)
The application first imports the necessary modules, then configures an instance of the Metric
class with provided DSM credentials. Ten data points with random values and a metric name of “test_metric” are published over the course of ten seconds.
The write function expects a dictionary containing at the bare minimum keys called “name” and “value” which hold the metric name and value. Labels can be included where the key name is the label name — in this example two keys with names “label1” and “label2” are included.
A success message is printed on every successful post, otherwise an error message containing the status from metric.write
is printed.
2.2. simple_reader.py
This application demonstrates the use of the prometheus-api-client module to read metrics from DesignSpark Cloud.
from DesignSpark.Cloud import Metrics
from datetime import datetime, timedelta
from prometheus_api_client import PrometheusConnect, MetricsList
DSM_INSTANCE = ""
DSM_KEY = ""
DSM_URL = ""
m = Metrics.Metric(instance=DSM_INSTANCE, key=DSM_KEY, url=DSM_URL)
reader_uri = m.getReadURI()
reader = PrometheusConnect(reader_uri)
metricData = reader.get_metric_range_data(metric_name="test_metric", start_time=(datetime.now() - timedelta(hours=24, minutes=0)), end_time=datetime.now())
metricList = MetricsList(metricData)
for item in metricList:
print(item)
The application imports both the Metric
module and also prometheus_api_client
which provides functions for retrieving and manipulating metrics.
The metric class is instantiated just as before in the simple_writer.py
example, and the function metric.getReadURI()
is called. This function returns a URI that can be passed into an instance of prometheus_api_client.prometheus_connect.PrometheusConnect()
that will then connect to the DesignSpark Cloud using the provided credentials.
A metric series called “test_metric” (the same as what is written in simple_writer.py
) is retrieved with a start and end date provided. A number of additional arguments are available, which are described in the prometheus_api_client documentation.
2.3. mqtt2dsm.py
This application demonstrates a practical use of the Metric module to write data from specified MQTT topics to DesignSpark Cloud, utilising the paho-mqtt
library to connect to a broker.
print("Error publishing! Reason {}".format(status))
def main():
global PROMETHEUS_CONFIG, TOPICS_CONFIG, MQTT_CONFIG, writer
client.on_connect = mqtt_connect
client.on_message = mqtt_message
with open(CONFIG_FILE) as fileHandle:
fileData = fileHandle.read()
configData = json.loads(fileData)
DSM_CONFIG = configData['dsm']
TOPICS_CONFIG = configData['topics']
MQTT_CONFIG = configData['mqtt']
writer = Metrics.Metric(instance=DSM_CONFIG['instance'], key=DSM_CONFIG['key'], url=DSM_CONFIG['url'])
if "username" and "password" in MQTT_CONFIG.items():
client.username_pw_set(MQTT_CONFIG["username"], MQTT_CONFIG["password"])
The main()
function in the program first registers two callbacks with the instantiated MQTT client for when a successful connection is made, and when an MQTT message is received. A JSON formatted configuration file is then loaded, and the Metric object and MQTT client configured based on provided values. The MQTT client then connects and spins in a loop waiting for incoming messages on subscribed topics.
def mqtt_connect(client, userdata, flags, rc):
global TOPICS_CONFIG
print("Successfully connected to MQTT broker")
topics = list(TOPICS_CONFIG.keys())
# Subscribe method expects a list of tuples containing (topic, QoS)
client.subscribe([(topic, 0) for topic in topics])
print("Successfully subscribed to all topics")
The mqtt_connect
callback handles subscribing to the provided topics using list comprehension to create a list of tuples as required by the subscribe
method.
def mqtt_message(client, userdata, msg):
global TOPICS_CONFIG
payload = msg.payload
print("Received MQTT message from topic {} with payload {}".format(msg.topic, payload))
if msg.topic in TOPICS_CONFIG.keys():
metricsData = copy.deepcopy(TOPICS_CONFIG[msg.topic])
if "labels" in metricsData:
labelData = metricsData["labels"]
else:
labelData = {}
metricName = metricsData.pop("metric")
prometheusData = {"name": metricName, "value": payload}
prometheusData.update(labelData)
status = writer.write(prometheusData)
if status:
print("Successfully published to Prometheus")
else:
print("Error publishing! Reason {}".format(status))
The mqtt_message
function publishes MQTT values to the DesignSpark Cloud. A sanity check is first performed to ensure the topic is present in the configuration file, then a copy of the topic data is made ready to have values inserted and removed. The metric
key from the configuration is popped and inserted into a new dictionary under the correct name
key required by the Metric
module. The value
key is then set to the received MQTT message payload, and the data point then published to the cloud.
"mqtt": {
"broker":"localhost",
"port": 1883
},
Within the configuration file multiple JSON objects are present. The mqtt
object at minimum needs to include keys called broker
and port
which sets the MQTT broker address and port. Optional keys include username
and password
should the broker require authentication.
"dsm": {
"url":"",
"instance":"",
"key":""
},
A section called dsm
is mandatory and must contains keys called url
, instance
and key
which are provided by DesignSpark Cloud when registering. The authentication key must have permission to write data.
"topics": {
"sensors/bedroom/temperature": {
"metric": "bedroom_temperature",
"labels": {
"room": "bedroom",
"sensorType": "temperature",
"sensorId": 1
}
}
}
The final section is called topics
and contains a number of objects where the object key is the MQTT topic to be subscribed to — in the example we’ve used sensors/bedroom/temperature
. Each topic object must contain a key called metric
which is the name of the metric. An optional section called labels
can be included which then becomes labels on the data point within DesignSpark Cloud.
2.4. BatteryTest.py
This application interfaces with an RS Pro electronic load to perform battery discharge tests and logs data to DesignSpark Cloud.
2.4.1. Command line switches
A number of command line switches are present
Short name |
Long name |
Description |
Type |
Example |
---|---|---|---|---|
-d |
–device |
The PyVISA device string |
String |
-d “ASRL6::INSTR” |
-b |
–baud |
The PyVISA device baud rate |
Integer |
-b 115200 |
-c |
–capacity |
The battery capacity to discharge to in amp-hours |
Integer |
-c 110 |
-a |
–amperage |
The discharge current in amps |
Float |
-a 5 |
-v |
–voltage |
The low voltage cut off point in volts |
Float |
-v 10.5 |
-t |
–time |
Time to discharge cut off in minutes |
Integer |
-t 10 |
-i |
–instance |
DesignSpark Cloud instance |
String |
-i 123456 |
-u |
–url |
DesignSpark Cloud URL |
String |
|
-k |
–key |
DesignSpark Cloud key |
String |
-k “YOUR_AUTHENTICATION_KEY” |
-n |
–name |
Battery name label |
String |
-n “Battery123” |
An example command: python3 BatteryTest.py -d "ASRL6::INSTR" -b 115200 -c 10 -a 2.5 -t 10 -v 10.5 -i 123456 -k "YOUR_AUTH_KEY" -u "https://prometheus-prod-01-eu-west-0.grafana.net" -n "batterytest"
2.4.2. Code explanation
try:
rm = pyvisa.ResourceManager('@py')
load = rm.open_resource(args.device)
load.baud_rate = args.baud
device_idn = load.query('*IDN?')
logging.info("Device identifier: {}".format(device_idn[:-1]))
except Exception as e:
logging.error("Could not establish device connection, reason {}".format(e))
exit()
The script first attempts to parse all the required command line arguments, then creates an object for communicating with the instrument. An instrument query *IDN?
is executed that asks the instrument to identify itself, which is then printed to the console.
try:
batt_string = f":BATT 1,{args.amperage + 3}A,{args.amperage}A,{args.voltage}V,{args.capacity}AH,{args.time}M"
logging.info("Setting battery mode to {}".format(batt_string))
load.write(batt_string)
time.sleep(1)
load.write(':RCL:BATT 1')
time.sleep(1)
logging.info("Turning on input")
load.write(':INP ON')
except Exception as e:
logging.error("Failed configuring and enabling instrument, reason {}".format(e))
A string is compiled that attempts to program a battery test regime on the electronic load, sent to the instrument and the input turned on which starts the discharge programme.
while load.query(':INP?') == 'ON\n':
capacity_list = []
voltage_list = []
current_list = []
for x in range(60):
if load.query(':INP?') == 'ON\n':
batt_capacity = float(load.query(':BATT:CAP?')[:-3])
capacity_list.append(batt_capacity)
batt_time = float(load.query(':BATT:TIM?')[:-2])
batt_voltage = round(float(load.query(':MEAS:VOLT?')[:-2]), 2)
voltage_list.append(batt_voltage)
batt_current = round(float(load.query(':MEAS:CURR?')[:-2]), 2)
current_list.append(batt_current)
batt_mins = int((batt_time * 60) % 60)
batt_secs = int((batt_time * 3600) % 60)
logging.info("Capacity: {}Ah, voltage: {}, current: {}, time: {}:{}".format(batt_capacity, batt_voltage, batt_current, batt_mins, batt_secs))
time.sleep(1)
else:
break
The instrument is polled once every second whilst the input is switched on, with readings of current capacity, voltage and current added to a list to be averaged. All the information is logged to the console.
if load.query(':INP?') == 'ON\n':
capacity = sum(capacity_list) / len(capacity_list)
status = m.write({"name":"batt_capacity", "value":capacity, "battery_name":args.name})
if status:
logging.info("Successfully published capacity to Prometheus")
else:
logging.error("Error publishing! Reason {}".format(status))
voltage = sum(voltage_list) / len(voltage_list)
status = m.write({"name":"batt_voltage", "value":voltage, "battery_name":args.name})
if status:
logging.info("Successfully published voltage to Prometheus")
else:
logging.error("Error publishing! Reason {}".format(status))
current = sum(current_list) / len(current_list)
status = m.write({"name":"batt_current", "value":current, "battery_name":args.name})
if status:
logging.info("Successfully published current to Prometheus")
else:
logging.error("Error publishing! Reason {}".format(status))
Once every minute an average of the last sixty seconds of data points is computed and published to DesignSpark Cloud.
The while loop exits once the input is turned off, which is automatically performed by the instrument when a cut off point is achieved (one or more of voltage, capacity or time). After the loop exits one last set of queries is made to the instrument to get capacity and time, which is then published to DesignSpark Cloud. This information is also mirrored to the console.