Skip to content
Open

Mmmm #11

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflow/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name: product-catalog-service-ci
on:
pull request:
branches:
- main
- main

jobs:
build:
Expand Down
237 changes: 5 additions & 232 deletions src/product-catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,265 +59,38 @@ func init() {
catalog, err = readProductFiles()
if err != nil {
log.Fatalf("Reading Product Files: %v", err)




os.Exit(1)
}
}

func initResource() *sdkresource.Resource {
initResourcesOnce.Do(func() {
extraResources, _ := sdkresource.New(
context.Background(),
sdkresource.WithOS(),
sdkresource.WithProcess(),
sdkresource.WithContainer(),
sdkresource.WithHost(),
)
resource, _ = sdkresource.Merge(
sdkresource.Default(),
extraResources,
)
})
return resource
}

func initTracerProvider() *sdktrace.TracerProvider {
ctx := context.Background()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
log.Fatalf("OTLP Trace gRPC Creation: %v", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(initResource()),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp
}

func initMeterProvider() *sdkmetric.MeterProvider {
ctx := context.Background()

exporter, err := otlpmetricgrpc.New(ctx)
if err != nil {
log.Fatalf("new otlp metric grpc exporter failed: %v", err)
}

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)),
sdkmetric.WithResource(initResource()),
)
otel.SetMeterProvider(mp)
return mp
}

func main() {
tp := initTracerProvider()
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatalf("Tracer Provider Shutdown: %v", err)
}
log.Println("Shutdown tracer provider")
}()

mp := initMeterProvider()
defer func() {
if err := mp.Shutdown(context.Background()); err != nil {
log.Fatalf("Error shutting down meter provider: %v", err)
}
log.Println("Shutdown meter provider")
}()
openfeature.AddHooks(otelhooks.NewTracesHook())
err := openfeature.SetProvider(flagd.NewProvider())
if err != nil {
log.Fatal(err)
}

err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second))
if err != nil {
log.Fatal(err)
}

svc := &productCatalog{}
var port string
mustMapEnv(&port, "PRODUCT_CATALOG_PORT")

log.Infof("Product Catalog gRPC server started on port: %s", port)

ln, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatalf("TCP Listen: %v", err)
}

srv := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

reflection.Register(srv)

pb.RegisterProductCatalogServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
defer cancel()

go func() {
if err := srv.Serve(ln); err != nil {
log.Fatalf("Failed to serve gRPC server, err: %v", err)
}
}()

<-ctx.Done()

srv.GracefulStop()
log.Println("Product Catalog gRPC server stopped")
}

type productCatalog struct {
pb.UnimplementedProductCatalogServiceServer
}

func readProductFiles() ([]*pb.Product, error) {

// find all .json files in the products directory
entries, err := os.ReadDir("./products")
if err != nil {
return nil, err
}

jsonFiles := make([]fs.FileInfo, 0, len(entries))
for _, entry := range entries {
if strings.HasSuffix(entry.Name(), ".json") {
info, err := entry.Info()
if err != nil {
return nil, err
}
jsonFiles = append(jsonFiles, info)
}
}

// read the contents of each .json file and unmarshal into a ListProductsResponse
// then append the products to the catalog
var products []*pb.Product
for _, f := range jsonFiles {
jsonData, err := os.ReadFile("./products/" + f.Name())
if err != nil {
return nil, err
}

var res pb.ListProductsResponse
if err := protojson.Unmarshal(jsonData, &res); err != nil {
return nil, err
}

products = append(products, res.Products...)
}

log.Infof("Loaded %d products", len(products))

return products, nil
}

func mustMapEnv(target *string, key string) {
value, present := os.LookupEnv(key)
if !present {
log.Fatalf("Environment Variable Not Set: %q", key)
}
*target = value
}

func (p *productCatalog) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
}

func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_WatchServer) error {
return status.Errorf(codes.Unimplemented, "health check via Watch not implemented")
}

func (p *productCatalog) ListProducts(ctx context.Context, req *pb.Empty) (*pb.ListProductsResponse, error) {
span := trace.SpanFromContext(ctx)

span.SetAttributes(
attribute.Int("app.products.count", len(catalog)),
)
return &pb.ListProductsResponse{Products: catalog}, nil
}

func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("app.product.id", req.Id),
)

// GetProduct will fail on a specific product when feature flag is enabled
if p.checkProductFailure(ctx, req.Id) {
msg := fmt.Sprintf("Error: Product Catalog Fail Feature Flag Enabled")
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.Internal, msg)
}

var found *pb.Product
for _, product := range catalog {
if req.Id == product.Id {
found = product
break
}
}

if found == nil {
msg := fmt.Sprintf("Product Not Found: %s", req.Id)
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.NotFound, msg)
}

msg := fmt.Sprintf("Product Found - ID: %s, Name: %s", req.Id, found.Name)
span.AddEvent(msg)
span.SetAttributes(
attribute.String("app.product.name", found.Name),
)
return found, nil
}

func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProductsRequest) (*pb.SearchProductsResponse, error) {
span := trace.SpanFromContext(ctx)

var result []*pb.Product
for _, product := range catalog {
if strings.Contains(strings.ToLower(product.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(product.Description), strings.ToLower(req.Query)) {
result = append(result, product)
}
}
span.SetAttributes(
attribute.Int("app.products_search.count", len(result)),
)
return &pb.SearchProductsResponse{Results: result}, nil
}

func (p *productCatalog) checkProductFailure(ctx context.Context, id string) bool {
if id != "OLJCESPC7Z" {
return false
}

client := openfeature.NewClient("productCatalog")
failureEnabled, _ := client.BooleanValue(
ctx, "productCatalogFailure", false, openfeature.EvaluationContext{},
)
return failureEnabled
}

func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, svcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),


)
}





Expand Down