https://pypi.org/project/opentelemetry-exporter-prometheus-remote-write/#installation
pip install opentelemetry-exporter-prometheus-remote-write
pip uninstall snappy python-snappy
pip install python-snappy
https://aws.amazon.com/blogs/opensource/building-a-prometheus-remote-write-exporter-for-the-opentelemetry-python-sdk/
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter
)
# Sets the global MeterProvider instance
metrics.set_meter_provider(MeterProvider())
# The Meter is responsible for creating and recording metrics. Each meter has a unique name, which we set as the module's name here.
meter = metrics.get_meter(__name__)
exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="https://prometheus-prod-10-prod-us-central-0.grafana.net/api/prom/push",
timeout=30,
basic_auth={
"username": "user",
"password": "passwod",
},
)
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)
labels = {
"service.name": "service123",
"service.version": "1.2.3",
"job": "custom-test",
}
class CPU:
def __init__(self):
self.noiseGenerator = PerlinNoise(octaves=5)
self.i = 0
ng = PerlinNoise(octaves=5)
arr = []
for i in range(10000):
y = ng(i*0.01)
arr.append(y)
map_arr = np.array(arr)
map_arr = (map_arr - map_arr.min()) / (map_arr.max() - map_arr.min())
map_arr = map_arr*100
self.map_arr = map_arr
def get_val(self):
#val = self.noiseGenerator(self.i*0.01)
#val = (1-val)*100 if val < 0 else val*100
val = self.map_arr[self.i]
self.i += 1
return val
def get_cpu_usage_callback(self, observer):
#val = random.randint(10, 100)
val = self.get_val()
clabels = {
**labels,
"cpu_scope": "all"
}
print(val)
observer.observe(float(val), clabels)
cpu = CPU()
meter.register_valueobserver(
callback=cpu.get_cpu_usage_callback,
name="cpu_percent",
description="per-cpu usage",
unit="1",
value_type=float,
)
"""
exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="http://localhost:9009/api/prom/push",
timeout=30,
basic_auth={
"username": "user",
"password": "pass123",
},
headers={
"X-Scope-Org-ID": "5",
"Authorization": "Bearer mytoken123",
},
proxies={
"http": "http://10.10.1.10:3000",
"https": "http://10.10.1.10:1080",
},
tls_config={
"cert_file": "path/to/file",
"key_file": "path/to/file",
"ca_file": "path_to_file",
"insecure_skip_verify": true, # for developing purposes
}
)
"""
Examples
https://github.com/open-telemetry/opentelemetry-python-contrib/blob/metrics/exporter/opentelemetry-exporter-prometheus-remote-write/examples/sampleapp.py
import logging
import random
import sys
import time
from logging import INFO
import psutil
from opentelemetry import metrics
from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
)
from opentelemetry.sdk.metrics.view import View, ViewConfig
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="http://cortex:9009/api/prom/push",
headers={"X-Scope-Org-ID": "5"},
)
metrics.get_meter_provider().start_pipeline(meter, exporter, 1)
testing_labels = {"environment": "testing"}
# Callback to gather cpu usage
def get_cpu_usage_callback(observer):
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
labels = {"cpu_number": str(number)}
observer.observe(percent, labels)
# Callback to gather RAM usage
def get_ram_usage_callback(observer):
ram_percent = psutil.virtual_memory().percent
observer.observe(ram_percent, {})
requests_counter = meter.create_counter(
name="requests",
description="number of requests",
unit="1",
value_type=int,
)
request_min_max = meter.create_counter(
name="requests_min_max",
description="min max sum count of requests",
unit="1",
value_type=int,
)
request_last_value = meter.create_counter(
name="requests_last_value",
description="last value number of requests",
unit="1",
value_type=int,
)
requests_size = meter.create_valuerecorder(
name="requests_size",
description="size of requests",
unit="1",
value_type=int,
)
requests_size_histogram = meter.create_valuerecorder(
name="requests_size_histogram",
description="histogram of request_size",
unit="1",
value_type=int,
)
requests_active = meter.create_updowncounter(
name="requests_active",
description="number of active requests",
unit="1",
value_type=int,
)
meter.register_sumobserver(
callback=get_ram_usage_callback,
name="ram_usage",
description="ram usage",
unit="1",
value_type=float,
)
meter.register_valueobserver(
callback=get_cpu_usage_callback,
name="cpu_percent",
description="per-cpu usage",
unit="1",
value_type=float,
)
counter_view1 = View(
requests_counter,
SumAggregator,
label_keys=["environment"],
view_config=ViewConfig.LABEL_KEYS,
)
counter_view2 = View(
request_min_max,
MinMaxSumCountAggregator,
label_keys=["os_type"],
view_config=ViewConfig.LABEL_KEYS,
)
counter_view3 = View(
request_last_value,
LastValueAggregator,
label_keys=["environment"],
view_config=ViewConfig.UNGROUPED,
)
size_view = View(
requests_size_histogram,
HistogramAggregator,
label_keys=["environment"],
aggregator_config={"bounds": [20, 40, 60, 80, 100]},
view_config=ViewConfig.UNGROUPED,
)
meter.register_view(counter_view1)
meter.register_view(counter_view2)
meter.register_view(counter_view3)
meter.register_view(size_view)
# Load generator
num = random.randint(0, 1000)
while True:
# counters
requests_counter.add(num % 131 + 200, testing_labels)
request_min_max.add(num % 181 + 200, testing_labels)
request_last_value.add(num % 101 + 200, testing_labels)
# updown counter
requests_active.add(num % 7231 + 200, testing_labels)
# value observers
requests_size.record(num % 6101 + 100, testing_labels)
requests_size_histogram.record(num % 113, testing_labels)
logger.log(level=INFO, msg="completed metrics collection cycle")
time.sleep(1)
num += 9791
Metric types
https://github.com/open-telemetry/opentelemetry-python-contrib/blob/metrics/exporter/opentelemetry-exporter-prometheus-remote-write/examples/sampleapp.py
- counter
- updowncounter
- valuerecorder
- sumobserver
- updownsumobserver
- valueobserver
https://logz.io/blog/python-custom-metrics-remotewrite-sdk/
ValueRecorder
import logging
import random
import sys
import time
from logging import INFO
import json
import time
from opentelemetry import metrics
from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
)
from opentelemetry.sdk.metrics.view import View, ViewConfig
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="http://localhost:9090/api/v1/write",
timeout=30,
)
metrics.get_meter_provider().start_pipeline(meter, exporter, 1)
requests_size = meter.create_valuerecorder(
name="requests_size2",
description="size of requests",
unit="1",
value_type=int,
)
for i in range(100):
label = {
"test": str(i)
}
print("label", label)
requests_size.record(i, label)
time.sleep(1)
parallel
def parallel(func, data, nthread=4):
from multiprocessing import Pool
from contextlib import closing
rs = []
with closing(Pool(nthread)) as p:
rs = p.map(func, data)
return rs
def work(tid):
label = {
"tid": f"thread#{tid}",
}
v = random.randint(10, 20)
print("label", label, v)
requests_size.record(random.randint(10, 20), label)
time.sleep(1)
parallel(work, [i%10 for i in range(100)], 4)
Remove labels
/usr/local/lib/python3.6/site-packages/opentelemetry/exporter/prometheus_remote_write/__init__.py
Add _last
in /usr/local/lib/python3.6/site-packages/opentelemetry/sdk/metrics/export/aggregate.py
switch MinMaxSumCountAggregator to MinMaxSumCountAggregator2
add to /usr/local/lib/python3.6/site-packages/opentelemetry/sdk/metrics/export/aggregate.py
class MinMaxSumCountAggregator(Aggregator):
"""Same as MinMaxSumCount but also with last value."""
_TYPE = namedtuple("minmaxsumcountlast", "min max sum count last")
def __init__(self, config=None):
super().__init__(config=config)
self.mmsc = MinMaxSumCountAggregator2()
self.current = None
self.checkpoint = self._TYPE(None, None, None, 0, None)
def update(self, value):
self.mmsc.update(value)
self.current = value
super().update(value)
def take_checkpoint(self):
self.mmsc.take_checkpoint()
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))
super().take_checkpoint()
def merge(self, other):
if self._verify_type(other):
self.mmsc.merge(other.mmsc)
last = self.checkpoint.last
self.last_update_timestamp = max(
self.last_update_timestamp, other.last_update_timestamp
)
if self.last_update_timestamp == other.last_update_timestamp:
last = other.checkpoint.last
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))
super().merge(other)
add _last
to /usr/local/lib/python3.6/site-packages/opentelemetry/exporter/prometheus_remote_write/__init__.py
Deploy receiver
https://prometheus.io/docs/prometheus/latest/storage/#overview
The built-in remote write receiver can be enabled by setting the --web.enable-remote-write-receiver
command line flag.
When enabled, the remote write receiver endpoint is /api/v1/write
.
solve out of order error
ts=2022-05-14T03:27:27.753Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t06\"}" timestamp=1652498774953
ts=2022-05-14T03:27:27.757Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t03\"}" timestamp=1652498774953
ts=2022-05-14T03:27:27.767Z caller=write_handler.go:102 level=error component=web msg="Out of order sample from remote write" err="out of order sample" series="{__name__=\"cpu_min\", thread=\"t07\"}" timestamp=1652498753917
https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmagent
Please download vmutils-* archive from releases page
./vmagent-prod -remoteWrite.url=http://localhost:9090/api/v1/write -remoteWrite.queues=1