1
1
{-# language DataKinds #-}
2
+ {-# language FlexibleContexts #-}
2
3
{-# language OverloadedStrings #-}
3
4
{-# language PartialTypeSignatures #-}
5
+ {-# language PolyKinds #-}
4
6
{-# language TypeApplications #-}
5
7
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
6
8
@@ -9,34 +11,48 @@ module Main where
9
11
import Control.Concurrent.Async
10
12
import Control.Concurrent.STM
11
13
import Control.Monad.IO.Class
14
+ import Control.Monad.Trace
12
15
import Data.Conduit
13
16
import qualified Data.Conduit.Combinators as C
14
17
import Data.Conduit.TMChan
15
18
import Data.Maybe (fromMaybe )
16
19
import Data.Proxy
17
20
import qualified Data.Text as T
18
21
import DeferredFolds.UnfoldlM
22
+ import Monitor.Tracing.Zipkin (Endpoint (.. ))
19
23
import Network.Wai.Handler.Warp
24
+ import Prometheus
20
25
import qualified StmContainers.Map as M
21
26
22
27
import Mu.GraphQL.Server
23
28
import Mu.GRpc.Server
24
29
import Mu.Instrumentation.Prometheus
30
+ import Mu.Instrumentation.Tracing
25
31
import Mu.Server
26
32
27
33
import Definition
28
34
29
35
main :: IO ()
30
36
main = do
37
+ -- Initialize prometheus
38
+ met <- initPrometheus " health"
39
+ -- Initialize zipkin
40
+ zpk <- newZipkin defaultZipkinSettings
41
+ { settingsPublishPeriod = Just 1
42
+ , settingsEndpoint = Just $ Endpoint (Just " me" ) Nothing Nothing Nothing }
43
+ let rootInfo = MuTracing alwaysSampled " health-check"
44
+ -- Initialize app
31
45
m <- M. newIO
32
46
upd <- newTBMChanIO 100
33
- met <- initPrometheus " health"
47
+ -- Put together the server
48
+ let s = zipkin rootInfo $ prometheus met $ server m upd
49
+ -- Run the app
34
50
putStrLn " running health check application"
35
- let s = prometheus met (server m upd)
36
51
runConcurrently $ (\ _ _ _ -> () )
37
- <$> Concurrently (runner 50051 (gRpcApp msgProtoBuf s))
38
- <*> Concurrently (runner 50052 (gRpcApp msgAvro s))
39
- <*> Concurrently (runner 50053 (graphQLAppQuery s (Proxy @ " HealthCheckServiceFS2" )))
52
+ <$> Concurrently (runner 50051 (gRpcAppTrans msgProtoBuf (runZipkin zpk) s))
53
+ <*> Concurrently (runner 50052 (gRpcAppTrans msgAvro (runZipkin zpk) s))
54
+ <*> Concurrently (runner 50053 (graphQLAppTransQuery (runZipkin zpk) s
55
+ (Proxy @ " HealthCheckServiceFS2" )))
40
56
where runner p app = run p (prometheusWai [" metrics" ] app)
41
57
42
58
-- Server implementation
@@ -45,7 +61,9 @@ main = do
45
61
type StatusMap = M. Map T. Text T. Text
46
62
type StatusUpdates = TBMChan HealthStatusMsg
47
63
48
- server :: StatusMap -> StatusUpdates -> ServerIO info HealthCheckService _
64
+ server :: (MonadServer m , MonadTrace m )
65
+ => StatusMap -> StatusUpdates
66
+ -> ServerT '[] info HealthCheckService m _
49
67
server m upd
50
68
= wrapServer (\ info h -> liftIO (print info) >> h) $
51
69
singleService ( method @ " setStatus" $ setStatus_ m upd
@@ -55,30 +73,36 @@ server m upd
55
73
, method @ " cleanAll" $ cleanAll_ m
56
74
, method @ " watch" $ watch_ upd)
57
75
58
- setStatus_ :: StatusMap -> StatusUpdates -> HealthStatusMsg -> ServerErrorIO ()
76
+ setStatus_ :: (MonadServer m , MonadTrace m )
77
+ => StatusMap -> StatusUpdates -> HealthStatusMsg
78
+ -> m ()
59
79
setStatus_ m upd
60
80
s@ (HealthStatusMsg (Just (HealthCheckMsg nm)) (Just (ServerStatusMsg ss)))
61
- = alwaysOk $ do
81
+ = childSpan " setStatus " $ alwaysOk $ do
62
82
putStr " setStatus: " >> print (nm, ss)
63
83
atomically $ do
64
84
M. insert ss nm m
65
85
writeTBMChan upd s
66
86
setStatus_ _ _ _ = serverError (ServerError Invalid " name or status missing" )
67
87
68
- checkH_ :: StatusMap -> HealthCheckMsg -> ServerErrorIO ServerStatusMsg
88
+ checkH_ :: (MonadServer m , MonadTrace m )
89
+ => StatusMap -> HealthCheckMsg
90
+ -> m ServerStatusMsg
69
91
checkH_ _ (HealthCheckMsg " " ) = serverError (ServerError Invalid " no server name given" )
70
92
checkH_ m (HealthCheckMsg nm) = alwaysOk $ do
71
93
putStr " check: " >> print nm
72
94
ss <- atomically $ M. lookup nm m
73
95
pure $ ServerStatusMsg (fromMaybe " " ss)
74
96
75
- clearStatus_ :: StatusMap -> HealthCheckMsg -> ServerErrorIO ()
97
+ clearStatus_ :: (MonadServer m , MonadTrace m )
98
+ => StatusMap -> HealthCheckMsg -> m ()
76
99
clearStatus_ _ (HealthCheckMsg " " ) = serverError (ServerError Invalid " no server name given" )
77
100
clearStatus_ m (HealthCheckMsg nm) = alwaysOk $ do
78
101
putStr " clearStatus: " >> print nm
79
102
atomically $ M. delete nm m
80
103
81
- checkAll_ :: StatusMap -> ServerErrorIO AllStatusMsg
104
+ checkAll_ :: (MonadServer m , MonadTrace m )
105
+ => StatusMap -> m AllStatusMsg
82
106
checkAll_ m = alwaysOk $ do
83
107
putStrLn " checkAll"
84
108
AllStatusMsg <$> atomically (consumeValues kvToStatus (M. unfoldlM m))
@@ -87,15 +111,17 @@ checkAll_ m = alwaysOk $ do
87
111
consumeValues f = foldlM' (\ xs (x,y) -> pure (f x y: xs)) []
88
112
kvToStatus k v = HealthStatusMsg (Just (HealthCheckMsg k)) (Just (ServerStatusMsg v))
89
113
90
- cleanAll_ :: StatusMap -> ServerErrorIO ()
114
+ cleanAll_ :: (MonadServer m , MonadTrace m )
115
+ => StatusMap -> m ()
91
116
cleanAll_ m = alwaysOk $ do
92
117
putStrLn " cleanAll"
93
118
atomically $ M. reset m
94
119
95
- watch_ :: StatusUpdates
120
+ watch_ :: (MonadServer m , MonadTrace m )
121
+ => StatusUpdates
96
122
-> HealthCheckMsg
97
- -> ConduitT ServerStatusMsg Void ServerErrorIO ()
98
- -> ServerErrorIO ()
123
+ -> ConduitT ServerStatusMsg Void m ()
124
+ -> m ()
99
125
watch_ upd hcm@ (HealthCheckMsg nm) sink = do
100
126
alwaysOk (putStr " watch: " >> print nm)
101
127
runConduit $ sourceTBMChan upd
@@ -109,3 +135,5 @@ watch_ upd hcm@(HealthCheckMsg nm) sink = do
109
135
Just (Just y) -> yield y >> catMaybesC
110
136
Just Nothing -> catMaybesC
111
137
Nothing -> pure ()
138
+
139
+ instance MonadMonitor m => MonadMonitor (TraceT m )
0 commit comments