2. Example Applications

2.1. simple_writer.py

This application demonstrates the use of the Metric module to publish ten random values to the specified DesignSpark Cloud database.

from DesignSpark.Cloud import Metrics
import random
import time

DSM_INSTANCE = ""
DSM_KEY = ""
DSM_URL = ""

m = Metrics.Metric(instance=DSM_INSTANCE, key=DSM_KEY, url=DSM_URL)
for x in range(10):
	status = m.write({"name":"test_metric", "value":random.randint(1, 50), "label1":"this_label", "label2":"that_label"})
	if status == True:
		print("Successfully posted")
	elif status is not True:
		print("Unsuccessful post, result {}".format(status))
	time.sleep(1)

The application first imports the necessary modules, then configures an instance of the Metric class with provided DSM credentials. Ten data points with random values and a metric name of “test_metric” are published over the course of ten seconds.

The write function expects a dictionary containing at the bare minimum keys called “name” and “value” which hold the metric name and value. Labels can be included where the key name is the label name — in this example two keys with names “label1” and “label2” are included.

A success message is printed on every successful post, otherwise an error message containing the status from metric.write is printed.

2.2. simple_reader.py

This application demonstrates the use of the prometheus-api-client module to read metrics from DesignSpark Cloud.

from DesignSpark.Cloud import Metrics
from datetime import datetime, timedelta
from prometheus_api_client import PrometheusConnect, MetricsList

DSM_INSTANCE = ""
DSM_KEY = ""
DSM_URL = ""

m = Metrics.Metric(instance=DSM_INSTANCE, key=DSM_KEY, url=DSM_URL)

reader_uri = m.getReadURI()

reader = PrometheusConnect(reader_uri)

metricData = reader.get_metric_range_data(metric_name="test_metric", start_time=(datetime.now() - timedelta(hours=24, minutes=0)), end_time=datetime.now())

metricList = MetricsList(metricData)

for item in metricList:
	print(item)

The application imports both the Metric module and also prometheus_api_client which provides functions for retrieving and manipulating metrics.

The metric class is instantiated just as before in the simple_writer.py example, and the function metric.getReadURI() is called. This function returns a URI that can be passed into an instance of prometheus_api_client.prometheus_connect.PrometheusConnect() that will then connect to the DesignSpark Cloud using the provided credentials.

A metric series called “test_metric” (the same as what is written in simple_writer.py) is retrieved with a start and end date provided. A number of additional arguments are available, which are described in the prometheus_api_client documentation.

2.3. mqtt2dsm.py

This application demonstrates a practical use of the Metric module to write data from specified MQTT topics to DesignSpark Cloud, utilising the paho-mqtt library to connect to a broker.

			print("Error publishing! Reason {}".format(status))


def main():
	global PROMETHEUS_CONFIG, TOPICS_CONFIG, MQTT_CONFIG, writer
	client.on_connect = mqtt_connect
	client.on_message = mqtt_message

	with open(CONFIG_FILE) as fileHandle:
		fileData = fileHandle.read()

	configData = json.loads(fileData)

	DSM_CONFIG = configData['dsm']
	TOPICS_CONFIG = configData['topics']
	MQTT_CONFIG = configData['mqtt']

	writer = Metrics.Metric(instance=DSM_CONFIG['instance'], key=DSM_CONFIG['key'], url=DSM_CONFIG['url'])

	if "username" and "password" in MQTT_CONFIG.items():
		client.username_pw_set(MQTT_CONFIG["username"], MQTT_CONFIG["password"])

The main() function in the program first registers two callbacks with the instantiated MQTT client for when a successful connection is made, and when an MQTT message is received. A JSON formatted configuration file is then loaded, and the Metric object and MQTT client configured based on provided values. The MQTT client then connects and spins in a loop waiting for incoming messages on subscribed topics.

def mqtt_connect(client, userdata, flags, rc):
	global TOPICS_CONFIG
	print("Successfully connected to MQTT broker")

	topics = list(TOPICS_CONFIG.keys())

	# Subscribe method expects a list of tuples containing (topic, QoS)
	client.subscribe([(topic, 0) for topic in topics])

	print("Successfully subscribed to all topics")

The mqtt_connect callback handles subscribing to the provided topics using list comprehension to create a list of tuples as required by the subscribe method.

def mqtt_message(client, userdata, msg):
	global TOPICS_CONFIG
	payload = msg.payload
	print("Received MQTT message from topic {} with payload {}".format(msg.topic, payload))

	if msg.topic in TOPICS_CONFIG.keys():
		metricsData = copy.deepcopy(TOPICS_CONFIG[msg.topic])

		if "labels" in metricsData:
			labelData = metricsData["labels"]
		else:
			labelData = {}

		metricName = metricsData.pop("metric")

		prometheusData = {"name": metricName, "value": payload}
		prometheusData.update(labelData)

		status = writer.write(prometheusData)

		if status:
			print("Successfully published to Prometheus")
		else:
			print("Error publishing! Reason {}".format(status))

The mqtt_message function publishes MQTT values to the DesignSpark Cloud. A sanity check is first performed to ensure the topic is present in the configuration file, then a copy of the topic data is made ready to have values inserted and removed. The metric key from the configuration is popped and inserted into a new dictionary under the correct name key required by the Metric module. The value key is then set to the received MQTT message payload, and the data point then published to the cloud.

	"mqtt": {
		"broker":"localhost",
		"port": 1883
	},

Within the configuration file multiple JSON objects are present. The mqtt object at minimum needs to include keys called broker and port which sets the MQTT broker address and port. Optional keys include username and password should the broker require authentication.

	"dsm": {
		"url":"",
		"instance":"",
		"key":""
	},

A section called dsm is mandatory and must contains keys called url, instance and key which are provided by DesignSpark Cloud when registering. The authentication key must have permission to write data.

	"topics": {
		"sensors/bedroom/temperature": {
			"metric": "bedroom_temperature",
			"labels": {
				"room": "bedroom",
				"sensorType": "temperature",
				"sensorId": 1
			}
		}
	}

The final section is called topics and contains a number of objects where the object key is the MQTT topic to be subscribed to — in the example we’ve used sensors/bedroom/temperature. Each topic object must contain a key called metric which is the name of the metric. An optional section called labels can be included which then becomes labels on the data point within DesignSpark Cloud.

2.4. BatteryTest.py

This application interfaces with an RS Pro electronic load to perform battery discharge tests and logs data to DesignSpark Cloud.

2.4.1. Command line switches

A number of command line switches are present

Short name

Long name

Description

Type

Example

-d

–device

The PyVISA device string

String

-d “ASRL6::INSTR”

-b

–baud

The PyVISA device baud rate

Integer

-b 115200

-c

–capacity

The battery capacity to discharge to in amp-hours

Integer

-c 110

-a

–amperage

The discharge current in amps

Float

-a 5

-v

–voltage

The low voltage cut off point in volts

Float

-v 10.5

-t

–time

Time to discharge cut off in minutes

Integer

-t 10

-i

–instance

DesignSpark Cloud instance

String

-i 123456

-u

–url

DesignSpark Cloud URL

String

-u “https://prometheus-prod-01-eu-west-0.grafana.net

-k

–key

DesignSpark Cloud key

String

-k “YOUR_AUTHENTICATION_KEY”

-n

–name

Battery name label

String

-n “Battery123”

An example command: python3 BatteryTest.py -d "ASRL6::INSTR" -b 115200 -c 10 -a 2.5 -t 10 -v 10.5 -i 123456 -k "YOUR_AUTH_KEY" -u "https://prometheus-prod-01-eu-west-0.grafana.net" -n "batterytest"

2.4.2. Code explanation

	try:
		rm = pyvisa.ResourceManager('@py')
		load = rm.open_resource(args.device)
		load.baud_rate = args.baud
		device_idn = load.query('*IDN?')
		logging.info("Device identifier: {}".format(device_idn[:-1]))
	except Exception as e:
		logging.error("Could not establish device connection, reason {}".format(e))
		exit()

The script first attempts to parse all the required command line arguments, then creates an object for communicating with the instrument. An instrument query *IDN? is executed that asks the instrument to identify itself, which is then printed to the console.

	try:
		batt_string = f":BATT 1,{args.amperage + 3}A,{args.amperage}A,{args.voltage}V,{args.capacity}AH,{args.time}M"
		logging.info("Setting battery mode to {}".format(batt_string))
		load.write(batt_string)
		time.sleep(1)
		load.write(':RCL:BATT 1')
		time.sleep(1)
		logging.info("Turning on input")
		load.write(':INP ON')
	except Exception as e:
		logging.error("Failed configuring and enabling instrument, reason {}".format(e))

A string is compiled that attempts to program a battery test regime on the electronic load, sent to the instrument and the input turned on which starts the discharge programme.

	while load.query(':INP?') == 'ON\n':
		capacity_list = []
		voltage_list = []
		current_list = []
		for x in range(60):
			if load.query(':INP?') == 'ON\n':
				batt_capacity = float(load.query(':BATT:CAP?')[:-3])
				capacity_list.append(batt_capacity)
				batt_time = float(load.query(':BATT:TIM?')[:-2])
				batt_voltage = round(float(load.query(':MEAS:VOLT?')[:-2]), 2)
				voltage_list.append(batt_voltage)
				batt_current = round(float(load.query(':MEAS:CURR?')[:-2]), 2)
				current_list.append(batt_current)

				batt_mins = int((batt_time * 60) % 60)
				batt_secs = int((batt_time * 3600) % 60)
				logging.info("Capacity: {}Ah, voltage: {}, current: {}, time: {}:{}".format(batt_capacity, batt_voltage, batt_current, batt_mins, batt_secs))
				time.sleep(1)
			else:
				break

The instrument is polled once every second whilst the input is switched on, with readings of current capacity, voltage and current added to a list to be averaged. All the information is logged to the console.

		if load.query(':INP?') == 'ON\n':
			capacity = sum(capacity_list) / len(capacity_list)
			status = m.write({"name":"batt_capacity", "value":capacity, "battery_name":args.name})
			if status:
				logging.info("Successfully published capacity to Prometheus")
			else:
				logging.error("Error publishing! Reason {}".format(status))

			voltage = sum(voltage_list) / len(voltage_list)
			status = m.write({"name":"batt_voltage", "value":voltage, "battery_name":args.name})
			if status:
				logging.info("Successfully published voltage to Prometheus")
			else:
				logging.error("Error publishing! Reason {}".format(status))

			current = sum(current_list) / len(current_list)
			status = m.write({"name":"batt_current", "value":current, "battery_name":args.name})
			if status:
				logging.info("Successfully published current to Prometheus")
			else:
				logging.error("Error publishing! Reason {}".format(status))

Once every minute an average of the last sixty seconds of data points is computed and published to DesignSpark Cloud.

The while loop exits once the input is turned off, which is automatically performed by the instrument when a cut off point is achieved (one or more of voltage, capacity or time). After the loop exits one last set of queries is made to the instrument to get capacity and time, which is then published to DesignSpark Cloud. This information is also mirrored to the console.