Carlos Aguni

Highly motivated self-taught IT analyst. Always learning and ready to explore new skills. An eternal apprentice.


OpenTelemetry Study Python

07 Mar 2022 »

https://www.timescale.com/blog/opentelemetry-and-python-a-complete-instrumentation-guide/

https://medium.com/@sayfeddinehammemi/opentracing-in-django-with-jaeger-75598ea93500

Install

opentelemetry-api==1.7.1
opentelemetry-exporter-jaeger==1.7.1
opentelemetry-exporter-jaeger-proto-grpc==1.7.1
opentelemetry-exporter-jaeger-thrift==1.7.1
opentelemetry-instrumentation==0.26b1
opentelemetry-instrumentation-flask==0.26b1
opentelemetry-instrumentation-requests==0.26b1
opentelemetry-instrumentation-wsgi==0.26b1
opentelemetry-sdk==1.7.1
opentelemetry-semantic-conventions==0.26b1
opentelemetry-util-http==0.26b1
opentracing==2.4.0

Basic

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)
import time
import threading
import random

trace.set_tracer_provider(
    TracerProvider(
        resource=Resource.create({SERVICE_NAME: "my-helloworld-service"})
    )
)

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(ConsoleSpanExporter())
)

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("foo"):
    with tracer.start_as_current_span("bar"):
        with tracer.start_as_current_span("baz"):
            print("Hello world from OpenTelemetry Python!")

Python simple

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)
import time
import threading
import random

trace.set_tracer_provider(
    TracerProvider(
        resource=Resource.create({SERVICE_NAME: "my-AI-service"})
    )
)

jaeger_exporter = JaegerExporter(
    agent_host_name="conda-test",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(ConsoleSpanExporter())
)

tracer = trace.get_tracer(__name__)

def runGPUMatch(match, span):
    #with tracer.start_as_current_span(f"Run GPU {match}"):
    #with tracer.start_as_current_span(f"Run GPU {match}", context=context):
    time.sleep(random.uniform(0,1)+0.05)
    #with trace.use_span(span, end_on_exit=True):
    with trace.use_span(span):
        with tracer.start_as_current_span(f"Run GPU is {match}?"):
            time.sleep(random.uniform(1,3))

with tracer.start_as_current_span("Start AI") as span:
    context = span.get_span_context()
    print(context)
    with tracer.start_as_current_span("Receive Image"):
        time.sleep(1)
    #span0 = tracer.start_span("Dog")
    animals = ["Dog", "Cat", "Bird", "Fish"]
    threads = [
        threading.Thread(target=runGPUMatch, args=(animal, span), daemon=True)
        for animal in animals
    ]
    [x.start() for x in threads]
    [x.join() for x in threads]
    with tracer.start_as_current_span("To Redis Cache"): # Store cache
        time.sleep(1)
    #return "answer"

Flask example

https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/flask/flask.html

from flask import Flask
from opentelemetry.instrumentation.flask import FlaskInstrumentor

app = Flask(__name__)

FlaskInstrumentor().instrument_app(app)

@app.route("/")
def hello():
    return "Hello!"

if __name__ == "__main__":
    app.run(debug=True)

Instrument Airflow with Opentracing/Opentelemetry #12771

https://github.com/apache/airflow/issues/12771

def main(cli):
 dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
    dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
    tis = dagrun.get_task_instances()

    root_span = tracer.start_span(
        name=dag.dag.dag_id,
        start_time=dt_to_ns_epoch(dagrun.start_date)
    )
    root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))

    for ti in tis:
        ctx = trace.set_span_in_context(root_span)

        span = tracer.start_span(
            name=ti.task_id,
            context=ctx,
            start_time=dt_to_ns_epoch(ti.start_date),
        )
        span.set_attribute('airflow.state', ti.state)
        span.set_attribute('airflow.operation', ti.operator)
        if ti.job_id is not None:
            span.set_attribute('airflow.job_id', ti.job_id)
        if ti.state != 'success':
            span.set_attribute('error', True)
        span.end(end_time=dt_to_ns_epoch(ti.end_date))