Opentelemetry trace的簡單架構圖如下,使用者端和伺服器端都需要啟動一個traceProvider,主要用於將trace資料傳輸到registry(如jaeger、opencensus等)。client和server通過context將整個鏈路串起來。
traceProvider會週期性的將資料推播到Registry,預設是5s
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
...
o := BatchSpanProcessorOptions{
BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
...
}
下面是官方提供的SDK,它實現了opentelemetry的API,也是操作opentelemetry所使用的基本庫:
tracesdk "go.opentelemetry.io/otel/sdk/trace"
要使用trace,首先要建立一個TracerProvider,定義exporter以及相關屬性。
參數列示應用名稱或程式碼庫名稱
var tracer = otel.Tracer("app_or_package_name")
下面展示了使用Jaeger作為exporter的tracerProvider,其中包含兩個概念:exporter和resource。前者為傳送遙測資料的目的地,如jaeger、zepkin、opencensus等;後者通常用於新增非臨時的底層後設資料資訊,如主機名,範例ID等。
// tracerProvider returns an OpenTelemetry TracerProvider configured to use
// the Jaeger exporter that will send spans to the provided url. The returned
// TracerProvider will also use a Resource configured with all the information
// about the application.
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in a Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
)),
)
return tp, nil
}
可以使用如下方式建立resource,semconv
包可以為資源屬性提供規範化的名稱。
// newResource returns a resource describing this application.
func newResource() *resource.Resource {
r, _ := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("fib"),
semconv.ServiceVersionKey.String("v0.1.0"),
attribute.String("environment", "demo"),
),
)
return r
}
如果使用自定義的tracerProvider,需要將其註冊為全域性tracerProvider:
tp, err := tracerProvider("http://localhost:14268/api/traces")
if err != nil {
log.Fatal(err)
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
tr := tp.Tracer("component-main")
ctx, span := tr.Start(ctx, "foo")
defer span.End()
當程式退出前,需要關閉tracerProvider,執行資料清理工作:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cleanly shutdown and flush telemetry when the application exits.
defer func(ctx context.Context) {
// Do not make the application hang when it is shutdown.
ctx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}(ctx)
tracer會建立span,為了建立span,需要一個context.Context
範例。該context
通常來自於請求物件,或已經存在的父span。Go的context
用於儲存活動的span,當span啟用後,就可以操作建立好的span以及其包含的已修改的上下文。當span結束後,其將成為不可變狀態。
下面為從請求中獲取span:
func httpHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "hello-span")
defer span.End()
// do some work to track with hello-span
}
// This context needs contain the active span you plan to extract.
ctx := context.TODO()
span := trace.SpanFromContext(ctx)
// Do something with the current span, optionally calling `span.End()` if you want it to en
下面將childSpan
巢狀在了parentSpan
中,表示序列執行:
func parentFunction(ctx context.Context) {
ctx, parentSpan := tracer.Start(ctx, "parent")
defer parentSpan.End()
// call the child function and start a nested span in there
childFunction(ctx)
// do more work - when this function ends, parentSpan will complete.
}
func childFunction(ctx context.Context) {
// Create a span to track `childFunction()` - this is a nested span whose parent is `parentSpan`
ctx, childSpan := tracer.Start(ctx, "child")
defer childSpan.End()
// do work here, when this function returns, childSpan will complete.
}
屬性是一組key/value後設資料,用於聚合、過濾以及對traces進行分組。
// setting attributes at creation...
ctx, span = tracer.Start(ctx, "attributesAtCreation", trace.WithAttributes(attribute.String("hello", "world")))
// ... and after creation
span.SetAttributes(attribute.Bool("isTrue", true), attribute.String("stringAttr", "hi!"))
可以使用如下方式預設定屬性,然後再新增到span中:
var myKey = attribute.Key("myCoolAttribute")
span.SetAttributes(myKey.String("a value"))
注:trace的屬性並不是隨便定義的,它有一些特定的約束,參見官方約定以及uptrace總結的約束
事件為可讀的訊息,表示在span的生命週期中"發生了某些事情"。例如,假設某個函數需要獲取鎖來存取互斥的資源時,可以在兩個節點建立事件,一個是嘗試存取資源時,另一個是獲取到鎖時。如:
span.AddEvent("Acquiring lock")
mutex.Lock()
span.AddEvent("Got lock, doing work...")
// do stuff
span.AddEvent("Unlocking")
mutex.Unlock()
事件的一個有用的特點是,它們的時間戳顯示為從span開始的偏移量(即事件發生的真實時間)。
事件也可以設定屬性:
span.AddEvent("Cancelled wait due to external signal", trace.WithAttributes(attribute.Int("pid", 4328), attribute.String("signal", "SIGHUP")))
通常用於表示操作是否有異常。預設狀態為Unset
,可以手動將其設定為Ok
,但通常沒必要這麼做。
result, err := operationThatCouldFail()
if err != nil {
span.SetStatus(codes.Error, "operationThatCouldFail failed")
}
用於記錄錯誤紀錄檔或呼叫棧等資訊。強烈建議在使用RecordError
的同時,通過SetStatus
將span狀態設定為Error
:
result, err := operationThatCouldFail()
if err != nil {
span.SetStatus(codes.Error, "operationThatCouldFail failed")
span.RecordError(err)
}
下面是對原生的一個函數bar
生成trace資訊:
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in a Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
)),
)
return tp, nil
}
func main() {
tp, err := tracerProvider("http://localhost:14268/api/traces")
if err != nil {
log.Fatal(err)
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cleanly shutdown and flush telemetry when the application exits.
defer func(ctx context.Context) {
// Do not make the application hang when it is shutdown.
ctx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}(ctx)
tr := tp.Tracer("component-main")
ctx, span := tr.Start(ctx, "foo")
defer span.End()
bar(ctx)
}
func bar(ctx context.Context) {
// Use the global TracerProvider.
tr := otel.Tracer("component-bar")
_, span := tr.Start(ctx, "bar")
span.SetAttributes(attribute.Key("testset").String("value"))
defer span.End()
// Do bar...
}
為了跨服務傳播Trace context需要註冊一個propagator ,通常在建立註冊TracerProvider之後執行。
func initTracer() (*sdktrace.TracerProvider, error) {
// Create stdout exporter to be able to retrieve
// the collected spans.
exporter, err := stdout.New(stdout.WithPrettyPrint())
if err != nil {
return nil, err
}
// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, err
}
如上註冊了兩種propagator :TraceContext和Baggage,因此可以使用這兩種資料結構傳播上下文。
下面是gorilla/mux
的伺服器端程式碼,通過 trace.SpanFromContext(r.Context())
從請求的context構建span,當然也可以通過tracer.Start(c.Context(), "getUser", oteltrace.WithAttributes(attribute.String("id", id)))
這種方式啟動一個新的span:
func TestPropagationWithCustomPropagators(t *testing.T) {
prop := propagation.TraceContext{}
r := httptest.NewRequest("GET", "/user/123", nil)
w := httptest.NewRecorder()
ctx := trace.ContextWithRemoteSpanContext(context.Background(), sc)
prop.Inject(ctx, propagation.HeaderCarrier(r.Header))
var called bool
router := mux.NewRouter()
router.Use(Middleware("foobar", WithPropagators(prop)))
router.HandleFunc("/user/{id}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
span := trace.SpanFromContext(r.Context())
defer span.End()
assert.Equal(t, sc, span.SpanContext())
w.WriteHeader(http.StatusOK)
}))
router.ServeHTTP(w, r)
assert.True(t, called, "failed to run test")
}
下面是使用baggage的使用者端和伺服器端程式碼,需要注意的是,使用者端需要使用otelhttp
。
使用者端程式碼:
package main
import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
)
func initTracer() (*sdktrace.TracerProvider, error) {
// Create stdout exporter to be able to retrieve
// the collected spans.
exporter, err := stdout.New(stdout.WithPrettyPrint())
if err != nil {
return nil, err
}
// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, err
}
func main() {
tp, err := initTracer()
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
url := flag.String("server", "http://localhost:7777/hello", "server url")
flag.Parse()
client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
bag, _ := baggage.Parse("username=donuts")
ctx := baggage.ContextWithBaggage(context.Background(), bag)
var body []byte
tr := otel.Tracer("example/client")
err = func(ctx context.Context) error {
ctx, span := tr.Start(ctx, "say hello", trace.WithAttributes(semconv.PeerServiceKey.String("ExampleService")))
defer span.End()
req, _ := http.NewRequestWithContext(ctx, "GET", *url, nil)
fmt.Printf("Sending request...\n")
res, err := client.Do(req)
if err != nil {
panic(err)
}
body, err = ioutil.ReadAll(res.Body)
_ = res.Body.Close()
return err
}(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Response Received: %s\n\n\n", body)
fmt.Printf("Waiting for few seconds to export spans ...\n\n")
time.Sleep(10 * time.Second)
fmt.Printf("Inspect traces on stdout\n")
}
伺服器端程式碼:
package main
import (
"context"
"io"
"log"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
)
func initTracer() (*sdktrace.TracerProvider, error) {
// Create stdout exporter to be able to retrieve
// the collected spans.
exporter, err := stdout.New(stdout.WithPrettyPrint())
if err != nil {
return nil, err
}
// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("ExampleService"))),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, err
}
func main() {
tp, err := initTracer()
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
uk := attribute.Key("username")
helloHandler := func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
span := trace.SpanFromContext(ctx) // span為Hello
defer span.End()
bag := baggage.FromContext(ctx)
span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
_, _ = io.WriteString(w, "Hello, world!\n")
}
// otelhttp.NewHandler會在處理請求的同時建立一個名為Hello的span
otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello")
http.Handle("/hello", otelHandler)
err = http.ListenAndServe(":7777", nil)
if err != nil {
log.Fatal(err)
}
}
上述程式碼生成的鏈路跟蹤如下,client的HTTP GET
會呼叫server端的Hello
。Server的Hello
span是在處理請求時生成的,上述用的是otelhttp
,其他registry也是類似的處理方式。
使用如下程式碼則可以啟動兩個獨立的span,可以表示兩個並行的任務:
helloHandler := func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx, span1 := tracer.Start(ctx, "span1 proecss", trace.WithLinks())
defer span1.End()
bag := baggage.FromContext(req.Context())
span1.SetAttributes(attribute.String("span1", "test1"))
span1.AddEvent("span1 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
ctx, span2 := tracer.Start(req.Context(), "span2 proecss", trace.WithLinks())
defer span2.End()
span2.SetAttributes(attribute.String("span2", "test2"))
span2.AddEvent("span2 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
_, _ = io.WriteString(w, "Hello, world!\n")
}
此外還可以通過baggage.NewKeyValueProperty("key", "value")
等方式建立baggage。
注:baggage要遵循W3C Baggage 規範。
官方給出了很多Registry,如Gorilla Mux、GORM、Gin-gonic 、gRPC等。更多可以參見官方程式碼庫。
provider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
.5
,則表示採集一半鏈路資訊生產中可以考慮使用TraceIDRatioBased
和ParentBased
。
本文來自部落格園,作者:charlieroro,轉載請註明原文連結:https://www.cnblogs.com/charlieroro/p/16396993.html