Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions pkg/pyroscope/modules_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"slices"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -221,6 +219,7 @@ func (f *Pyroscope) initSegmentWriterClient() (_ services.Service, err error) {
logger, f.reg,
f.segmentWriterRing,
placement,
grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -309,6 +308,7 @@ func (f *Pyroscope) initMetastoreClient() (services.Service, error) {
f.logger,
f.Cfg.Metastore.GRPCClientConfig,
disc,
grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)),
)
return f.metastoreClient.Service(), nil
}
Expand Down Expand Up @@ -370,6 +370,7 @@ func (f *Pyroscope) initQueryBackendClient() (services.Service, error) {
f.Cfg.QueryBackend.Address,
f.Cfg.QueryBackend.GRPCClientConfig,
f.Cfg.QueryBackend.ClientTimeout,
grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -448,19 +449,7 @@ func (f *Pyroscope) initHealthServer() (services.Service, error) {
}

func (f *Pyroscope) grpcClientInterceptors() []grpc.UnaryClientInterceptor {
requestDuration := util.RegisterOrGet(f.reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "grpc_client",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent waiting for gRPC response.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 50,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"method", "status_code"}))

return []grpc.UnaryClientInterceptor{
middleware.UnaryClientInstrumentInterceptor(requestDuration, middleware.ReportGRPCStatusOption),
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
}
}
88 changes: 88 additions & 0 deletions pkg/util/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package util

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

type grpcClientStatsKey struct{}

func GrpcClientStatsHandler(reg prometheus.Registerer) stats.Handler {
return &statsHandler{
ElapsedDuration: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "grpc_client",
Name: "request_duration_seconds",
Help: "Time (in seconds) required to send and recieve a gRPC request/response.",

Check failure on line 20 in pkg/util/grpc.go

View workflow job for this annotation

GitHub Actions / lint

`recieve` is a misspelling of `receive` (misspell)
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 50,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"method", "status_code"})),

RequestDecompressedBytes: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "grpc_client",
Name: "request_body_bytes",
Help: "Number of decompressed bytes in the request body.",
}, []string{"method"})),

ResponseDecompressedBytes: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "grpc_client",
Name: "response_body_bytes",
Help: "Number of decompressed bytes in the response body.",
}, []string{"method"})),
}
}

type statsHandler struct {
ElapsedDuration *prometheus.HistogramVec
RequestDecompressedBytes *prometheus.HistogramVec
ResponseDecompressedBytes *prometheus.HistogramVec
}

func (s *statsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {}

func (s *statsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
if !rpcStats.IsClient() {
return
}

info, ok := ctx.Value(grpcClientStatsKey{}).(*stats.RPCTagInfo)
if !ok {
return
}

switch msg := rpcStats.(type) {
case *stats.InPayload:
s.ResponseDecompressedBytes.With(prometheus.Labels{
"method": info.FullMethodName,
}).Observe(float64(msg.Length))

case *stats.OutPayload:
s.RequestDecompressedBytes.With(prometheus.Labels{
"method": info.FullMethodName,
}).Observe(float64(msg.Length))

case *stats.End:
statusCode, _ := status.FromError(msg.Error)

s.ElapsedDuration.With(prometheus.Labels{
"method": info.FullMethodName,
"status_code": statusCode.Code().String(),
}).Observe(msg.EndTime.Sub(msg.BeginTime).Seconds())
}
}

func (s *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (s *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, grpcClientStatsKey{}, info)
}
Loading