Skip to content

Commit 8b879fb

Browse files
authored
add EntityResource.makeK8sPod (#5783)
1 parent 597eac0 commit 8b879fb

File tree

9 files changed

+314
-88
lines changed

9 files changed

+314
-88
lines changed

.changeset/solid-geese-pump.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/platform-node": patch
3+
"@effect/cluster": patch
4+
---
5+
6+
add EntityResource.makeK8sPod

packages/cluster/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,8 @@
5151
"@testcontainers/mysql": "^10.25.0",
5252
"@testcontainers/postgresql": "^10.25.0",
5353
"effect": "workspace:^"
54+
},
55+
"dependencies": {
56+
"kubernetes-types": "^1.30.0"
5457
}
5558
}

packages/cluster/src/EntityResource.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import * as Effect from "effect/Effect"
77
import { identity } from "effect/Function"
88
import * as RcRef from "effect/RcRef"
99
import * as Scope from "effect/Scope"
10+
import type * as v1 from "kubernetes-types/core/v1.d.ts"
1011
import * as Entity from "./Entity.js"
12+
import * as K8sHttpClient from "./K8sHttpClient.js"
1113
import type { Sharding } from "./Sharding.js"
1214

1315
/**
@@ -106,3 +108,31 @@ export const make: <A, E, R>(options: {
106108
close: RcRef.invalidate(ref)
107109
})
108110
})
111+
112+
/**
113+
* @since 1.0.0
114+
* @category Kubernetes
115+
*/
116+
export const makeK8sPod: (
117+
spec: v1.Pod,
118+
options?: {
119+
readonly idleTimeToLive?: Duration.DurationInput | undefined
120+
} | undefined
121+
) => Effect.Effect<
122+
EntityResource<K8sHttpClient.PodStatus>,
123+
never,
124+
Scope.Scope | Sharding | Entity.CurrentAddress | K8sHttpClient.K8sHttpClient
125+
> = Effect.fnUntraced(function*(spec: v1.Pod, options?: {
126+
readonly idleTimeToLive?: Duration.DurationInput | undefined
127+
}) {
128+
const createPod = yield* K8sHttpClient.makeCreatePod
129+
return yield* make({
130+
...options,
131+
acquire: Effect.gen(function*() {
132+
const scope = yield* CloseScope
133+
return yield* createPod(spec).pipe(
134+
Scope.extend(scope)
135+
)
136+
})
137+
})
138+
})
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/**
2+
* @since 1.0.0
3+
*/
4+
import * as FileSystem from "@effect/platform/FileSystem"
5+
import * as HttpClient from "@effect/platform/HttpClient"
6+
import type * as HttpClientError from "@effect/platform/HttpClientError"
7+
import * as HttpClientRequest from "@effect/platform/HttpClientRequest"
8+
import * as HttpClientResponse from "@effect/platform/HttpClientResponse"
9+
import * as Context from "effect/Context"
10+
import * as Effect from "effect/Effect"
11+
import { identity } from "effect/Function"
12+
import * as Layer from "effect/Layer"
13+
import * as Option from "effect/Option"
14+
import type * as ParseResult from "effect/ParseResult"
15+
import * as Schedule from "effect/Schedule"
16+
import * as Schema from "effect/Schema"
17+
import type * as v1 from "kubernetes-types/core/v1.d.ts"
18+
19+
/**
20+
* @since 1.0.0
21+
* @category Tags
22+
*/
23+
export class K8sHttpClient extends Context.Tag("@effect/cluster/K8sHttpClient")<
24+
K8sHttpClient,
25+
HttpClient.HttpClient
26+
>() {}
27+
28+
/**
29+
* @since 1.0.0
30+
* @category Layers
31+
*/
32+
export const layer: Layer.Layer<
33+
K8sHttpClient,
34+
never,
35+
HttpClient.HttpClient | FileSystem.FileSystem
36+
> = Layer.effect(
37+
K8sHttpClient,
38+
Effect.gen(function*() {
39+
const fs = yield* FileSystem.FileSystem
40+
const token = yield* fs.readFileString("/var/run/secrets/kubernetes.io/serviceaccount/token").pipe(
41+
Effect.option
42+
)
43+
return (yield* HttpClient.HttpClient).pipe(
44+
HttpClient.mapRequest(HttpClientRequest.prependUrl("https://kubernetes.default.svc/api")),
45+
token._tag === "Some" ? HttpClient.mapRequest(HttpClientRequest.bearerToken(token.value.trim())) : identity,
46+
HttpClient.filterStatusOk,
47+
HttpClient.retryTransient({
48+
schedule: Schedule.spaced(5000)
49+
})
50+
)
51+
})
52+
)
53+
54+
/**
55+
* @since 1.0.0
56+
* @category Constructors
57+
*/
58+
export const makeGetPods: (
59+
options?: {
60+
readonly namespace?: string | undefined
61+
readonly labelSelector?: string | undefined
62+
} | undefined
63+
) => Effect.Effect<
64+
Effect.Effect<Map<string, Pod>, HttpClientError.HttpClientError | ParseResult.ParseError, never>,
65+
never,
66+
K8sHttpClient
67+
> = Effect.fnUntraced(function*(options?: {
68+
readonly namespace?: string | undefined
69+
readonly labelSelector?: string | undefined
70+
}) {
71+
const client = yield* K8sHttpClient
72+
73+
const getPods = HttpClientRequest.get(
74+
options?.namespace ? `/v1/namespaces/${options.namespace}/pods` : "/v1/pods"
75+
).pipe(
76+
HttpClientRequest.setUrlParam("fieldSelector", "status.phase=Running"),
77+
options?.labelSelector ? HttpClientRequest.setUrlParam("labelSelector", options.labelSelector) : identity
78+
)
79+
80+
return yield* client.execute(getPods).pipe(
81+
Effect.flatMap(HttpClientResponse.schemaBodyJson(PodList)),
82+
Effect.map((list) => {
83+
const pods = new Map<string, Pod>()
84+
for (let i = 0; i < list.items.length; i++) {
85+
const pod = list.items[i]
86+
pods.set(pod.status.podIP, pod)
87+
}
88+
return pods
89+
}),
90+
Effect.tapErrorCause((cause) => Effect.logWarning("Failed to fetch pods from Kubernetes API", cause)),
91+
Effect.cachedWithTTL("10 seconds")
92+
)
93+
})
94+
95+
/**
96+
* @since 1.0.0
97+
* @category Constructors
98+
*/
99+
export const makeCreatePod = Effect.gen(function*() {
100+
const client = yield* K8sHttpClient
101+
102+
return Effect.fnUntraced(function*(spec: v1.Pod) {
103+
spec = {
104+
apiVersion: "v1",
105+
kind: "Pod",
106+
metadata: {
107+
namespace: "default",
108+
...spec.metadata
109+
},
110+
...spec
111+
}
112+
const namespace = spec.metadata?.namespace ?? "default"
113+
const name = spec.metadata!.name!
114+
const readPodRaw = HttpClientRequest.get(`/v1/namespaces/${namespace}/pods/${name}`).pipe(
115+
client.execute
116+
)
117+
const readPod = readPodRaw.pipe(
118+
Effect.flatMap(HttpClientResponse.schemaBodyJson(Pod)),
119+
Effect.asSome,
120+
Effect.retry({
121+
while: (e) => e._tag === "ParseError",
122+
schedule: Schedule.spaced("1 seconds")
123+
}),
124+
Effect.catchIf((err) => err._tag === "ResponseError" && err.response.status === 404, () => Effect.succeedNone),
125+
Effect.orDie
126+
)
127+
const isPodFound = readPodRaw.pipe(
128+
Effect.as(true),
129+
Effect.catchIf(
130+
(err) => err._tag === "ResponseError" && err.response.status === 404,
131+
() => Effect.succeed(false)
132+
)
133+
)
134+
const createPod = HttpClientRequest.post(`/v1/namespaces/${namespace}/pods`).pipe(
135+
HttpClientRequest.bodyUnsafeJson(spec),
136+
client.execute,
137+
Effect.catchIf(
138+
(err) => err._tag === "ResponseError" && err.response.status === 409,
139+
() => readPod
140+
),
141+
Effect.tapErrorCause(Effect.logInfo),
142+
Effect.orDie
143+
)
144+
const deletePod = HttpClientRequest.del(`/v1/namespaces/${namespace}/pods/${name}`).pipe(
145+
client.execute,
146+
Effect.flatMap((res) => res.json),
147+
Effect.catchIf(
148+
(err) => err._tag === "ResponseError" && err.response.status === 404,
149+
() => Effect.void
150+
),
151+
Effect.tapErrorCause(Effect.logInfo),
152+
Effect.orDie,
153+
Effect.asVoid
154+
)
155+
yield* Effect.addFinalizer(Effect.fnUntraced(function*() {
156+
yield* deletePod
157+
yield* isPodFound.pipe(
158+
Effect.repeat({
159+
until: (found) => !found,
160+
schedule: Schedule.spaced("3 seconds")
161+
}),
162+
Effect.orDie
163+
)
164+
}))
165+
166+
let opod = Option.none<Pod>()
167+
while (Option.isNone(opod) || !opod.value.isReady) {
168+
if (Option.isNone(opod)) {
169+
yield* createPod
170+
}
171+
yield* Effect.sleep("3 seconds")
172+
opod = yield* readPod
173+
}
174+
return opod.value.status
175+
}, Effect.withSpan("K8sHttpClient.createPod"))
176+
})
177+
178+
/**
179+
* @since 1.0.0
180+
* @category Schemas
181+
*/
182+
export class PodStatus extends Schema.Class<PodStatus>("@effect/cluster/K8sHttpClient/PodStatus")({
183+
phase: Schema.String,
184+
conditions: Schema.Array(Schema.Struct({
185+
type: Schema.String,
186+
status: Schema.String,
187+
lastTransitionTime: Schema.String
188+
})),
189+
podIP: Schema.String,
190+
hostIP: Schema.String
191+
}) {}
192+
193+
/**
194+
* @since 1.0.0
195+
* @category Schemas
196+
*/
197+
export class Pod extends Schema.Class<Pod>("@effect/cluster/K8sHttpClient/Pod")({
198+
status: PodStatus
199+
}) {
200+
get isReady(): boolean {
201+
for (let i = 0; i < this.status.conditions.length; i++) {
202+
const condition = this.status.conditions[i]
203+
if (condition.type === "Ready") {
204+
return condition.status === "True"
205+
}
206+
}
207+
return false
208+
}
209+
210+
get isReadyOrInitializing(): boolean {
211+
let initializedAt: string | undefined
212+
let readyAt: string | undefined
213+
for (let i = 0; i < this.status.conditions.length; i++) {
214+
const condition = this.status.conditions[i]
215+
switch (condition.type) {
216+
case "Initialized": {
217+
if (condition.status !== "True") {
218+
return true
219+
}
220+
initializedAt = condition.lastTransitionTime
221+
break
222+
}
223+
case "Ready": {
224+
if (condition.status === "True") {
225+
return true
226+
}
227+
readyAt = condition.lastTransitionTime
228+
break
229+
}
230+
}
231+
}
232+
// if the pod is still booting up, consider it ready as it would have
233+
// already registered itself with RunnerStorage by now
234+
return initializedAt === readyAt
235+
}
236+
}
237+
238+
const PodList = Schema.Struct({
239+
items: Schema.Array(Pod)
240+
})

0 commit comments

Comments
 (0)