Carlos Aguni

Highly motivated self-taught IT analyst. Always learning and ready to explore new skills. An eternal apprentice.


Python remote_write prometheus

21 Feb 2022 »

https://pypi.org/project/opentelemetry-exporter-prometheus-remote-write/#installation

pip install opentelemetry-exporter-prometheus-remote-write
pip uninstall snappy python-snappy
pip install python-snappy

https://aws.amazon.com/blogs/opensource/building-a-prometheus-remote-write-exporter-for-the-opentelemetry-python-sdk/

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.prometheus_remote_write import (
    PrometheusRemoteWriteMetricsExporter
)

# Sets the global MeterProvider instance
metrics.set_meter_provider(MeterProvider())

# The Meter is responsible for creating and recording metrics. Each meter has a unique name, which we set as the module's name here.
meter = metrics.get_meter(__name__)

exporter = PrometheusRemoteWriteMetricsExporter(
    endpoint="https://prometheus-prod-10-prod-us-central-0.grafana.net/api/prom/push",
    timeout=30,
    basic_auth={
        "username": "user",
        "password": "passwod",
    },
)

metrics.get_meter_provider().start_pipeline(meter, exporter, 5)

labels = {
    "service.name": "service123",
    "service.version": "1.2.3",
    "job": "custom-test",
}

class CPU:
    def __init__(self):
        self.noiseGenerator = PerlinNoise(octaves=5)
        self.i = 0
        ng = PerlinNoise(octaves=5)
        arr = []
        for i in range(10000):
            y = ng(i*0.01)
            arr.append(y)
        map_arr = np.array(arr)
        map_arr = (map_arr - map_arr.min()) / (map_arr.max() - map_arr.min())
        map_arr = map_arr*100
        self.map_arr = map_arr
    def get_val(self):
        #val = self.noiseGenerator(self.i*0.01)
        #val = (1-val)*100 if val < 0 else val*100
        val = self.map_arr[self.i]
        self.i += 1
        return val
    def get_cpu_usage_callback(self, observer):
        #val = random.randint(10, 100)
        val = self.get_val()
        clabels = {
            **labels,
            "cpu_scope": "all"
        }
        print(val)
        observer.observe(float(val), clabels)
cpu = CPU()
    
meter.register_valueobserver(
    callback=cpu.get_cpu_usage_callback,
    name="cpu_percent",
    description="per-cpu usage",
    unit="1",
    value_type=float,
)
"""
exporter = PrometheusRemoteWriteMetricsExporter(
    endpoint="http://localhost:9009/api/prom/push",
    timeout=30,
    basic_auth={
        "username": "user",
        "password": "pass123",
    },
    headers={
        "X-Scope-Org-ID": "5",
        "Authorization": "Bearer mytoken123",
    },
    proxies={
        "http": "http://10.10.1.10:3000",
        "https": "http://10.10.1.10:1080",
    },
    tls_config={
        "cert_file": "path/to/file",
        "key_file": "path/to/file",
        "ca_file": "path_to_file",
        "insecure_skip_verify": true, # for developing purposes
    }
)
"""

Examples

https://github.com/open-telemetry/opentelemetry-python-contrib/blob/metrics/exporter/opentelemetry-exporter-prometheus-remote-write/examples/sampleapp.py


import logging
import random
import sys
import time
from logging import INFO

import psutil

from opentelemetry import metrics
from opentelemetry.exporter.prometheus_remote_write import (
    PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import (
    HistogramAggregator,
    LastValueAggregator,
    MinMaxSumCountAggregator,
    SumAggregator,
)
from opentelemetry.sdk.metrics.view import View, ViewConfig

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)

metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = PrometheusRemoteWriteMetricsExporter(
    endpoint="http://cortex:9009/api/prom/push",
    headers={"X-Scope-Org-ID": "5"},
)
metrics.get_meter_provider().start_pipeline(meter, exporter, 1)
testing_labels = {"environment": "testing"}


# Callback to gather cpu usage
def get_cpu_usage_callback(observer):
    for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
        labels = {"cpu_number": str(number)}
        observer.observe(percent, labels)


# Callback to gather RAM usage
def get_ram_usage_callback(observer):
    ram_percent = psutil.virtual_memory().percent
    observer.observe(ram_percent, {})


requests_counter = meter.create_counter(
    name="requests",
    description="number of requests",
    unit="1",
    value_type=int,
)

request_min_max = meter.create_counter(
    name="requests_min_max",
    description="min max sum count of requests",
    unit="1",
    value_type=int,
)

request_last_value = meter.create_counter(
    name="requests_last_value",
    description="last value number of requests",
    unit="1",
    value_type=int,
)

requests_size = meter.create_valuerecorder(
    name="requests_size",
    description="size of requests",
    unit="1",
    value_type=int,
)

requests_size_histogram = meter.create_valuerecorder(
    name="requests_size_histogram",
    description="histogram of request_size",
    unit="1",
    value_type=int,
)
requests_active = meter.create_updowncounter(
    name="requests_active",
    description="number of active requests",
    unit="1",
    value_type=int,
)

meter.register_sumobserver(
    callback=get_ram_usage_callback,
    name="ram_usage",
    description="ram usage",
    unit="1",
    value_type=float,
)

meter.register_valueobserver(
    callback=get_cpu_usage_callback,
    name="cpu_percent",
    description="per-cpu usage",
    unit="1",
    value_type=float,
)


counter_view1 = View(
    requests_counter,
    SumAggregator,
    label_keys=["environment"],
    view_config=ViewConfig.LABEL_KEYS,
)
counter_view2 = View(
    request_min_max,
    MinMaxSumCountAggregator,
    label_keys=["os_type"],
    view_config=ViewConfig.LABEL_KEYS,
)

counter_view3 = View(
    request_last_value,
    LastValueAggregator,
    label_keys=["environment"],
    view_config=ViewConfig.UNGROUPED,
)
size_view = View(
    requests_size_histogram,
    HistogramAggregator,
    label_keys=["environment"],
    aggregator_config={"bounds": [20, 40, 60, 80, 100]},
    view_config=ViewConfig.UNGROUPED,
)
meter.register_view(counter_view1)
meter.register_view(counter_view2)
meter.register_view(counter_view3)
meter.register_view(size_view)

# Load generator
num = random.randint(0, 1000)
while True:
    # counters
    requests_counter.add(num % 131 + 200, testing_labels)
    request_min_max.add(num % 181 + 200, testing_labels)
    request_last_value.add(num % 101 + 200, testing_labels)

    # updown counter
    requests_active.add(num % 7231 + 200, testing_labels)

    # value observers
    requests_size.record(num % 6101 + 100, testing_labels)
    requests_size_histogram.record(num % 113, testing_labels)
    logger.log(level=INFO, msg="completed metrics collection cycle")
    time.sleep(1)
    num += 9791

Metric types

https://github.com/open-telemetry/opentelemetry-python-contrib/blob/metrics/exporter/opentelemetry-exporter-prometheus-remote-write/examples/sampleapp.py

  • counter
  • updowncounter
  • valuerecorder
  • sumobserver
  • updownsumobserver
  • valueobserver

https://logz.io/blog/python-custom-metrics-remotewrite-sdk/

ValueRecorder

import logging
import random
import sys
import time
from logging import INFO
import json
import time


from opentelemetry import metrics
from opentelemetry.exporter.prometheus_remote_write import (
    PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import (
    HistogramAggregator,
    LastValueAggregator,
    MinMaxSumCountAggregator,
    SumAggregator,
)
from opentelemetry.sdk.metrics.view import View, ViewConfig

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)

metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)

exporter = PrometheusRemoteWriteMetricsExporter(
        endpoint="http://localhost:9090/api/v1/write",
        timeout=30,
)

metrics.get_meter_provider().start_pipeline(meter, exporter, 1)

requests_size = meter.create_valuerecorder(
    name="requests_size2",
    description="size of requests",
    unit="1",
    value_type=int,
)

for i in range(100):
    label = {
        "test": str(i)
    }
    print("label", label)
    requests_size.record(i, label)
    time.sleep(1)


parallel

def parallel(func, data, nthread=4):
    from multiprocessing import Pool
    from contextlib import closing
    rs = []
    with closing(Pool(nthread)) as p:
        rs = p.map(func, data)
    return rs

def work(tid):
    label = {
        "tid": f"thread#{tid}",
    }
    v = random.randint(10, 20)
    print("label", label, v)
    requests_size.record(random.randint(10, 20), label)
    time.sleep(1)

parallel(work, [i%10 for i in range(100)], 4)

Remove labels

/usr/local/lib/python3.6/site-packages/opentelemetry/exporter/prometheus_remote_write/__init__.py

Add _last

in /usr/local/lib/python3.6/site-packages/opentelemetry/sdk/metrics/export/aggregate.py switch MinMaxSumCountAggregator to MinMaxSumCountAggregator2

add to /usr/local/lib/python3.6/site-packages/opentelemetry/sdk/metrics/export/aggregate.py

class MinMaxSumCountAggregator(Aggregator):
    """Same as MinMaxSumCount but also with last value."""

    _TYPE = namedtuple("minmaxsumcountlast", "min max sum count last")

    def __init__(self, config=None):
        super().__init__(config=config)
        self.mmsc = MinMaxSumCountAggregator2()
        self.current = None
        self.checkpoint = self._TYPE(None, None, None, 0, None)

    def update(self, value):
        self.mmsc.update(value)
        self.current = value
        super().update(value)

    def take_checkpoint(self):
        self.mmsc.take_checkpoint()
        self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))
        super().take_checkpoint()

    def merge(self, other):
        if self._verify_type(other):
            self.mmsc.merge(other.mmsc)
            last = self.checkpoint.last

            self.last_update_timestamp = max(
                self.last_update_timestamp, other.last_update_timestamp
            )

            if self.last_update_timestamp == other.last_update_timestamp:
                last = other.checkpoint.last
            self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))
            super().merge(other)

add _last to /usr/local/lib/python3.6/site-packages/opentelemetry/exporter/prometheus_remote_write/__init__.py

Deploy receiver

https://prometheus.io/docs/prometheus/latest/storage/#overview

The built-in remote write receiver can be enabled by setting the --web.enable-remote-write-receiver command line flag.
When enabled, the remote write receiver endpoint is /api/v1/write.

solve out of order error

ts=2022-05-14T03:27:27.753Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t06\"}" timestamp=1652498774953
ts=2022-05-14T03:27:27.757Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t03\"}" timestamp=1652498774953
ts=2022-05-14T03:27:27.767Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t07\"}" timestamp=1652498753917

https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmagent

Please download vmutils-* archive from releases page

./vmagent-prod -remoteWrite.url=http://localhost:9090/api/v1/write -remoteWrite.queues=1