Skip to content

Commit f1bbca0

Browse files
authored
Merge pull request #74 from cr-org/type-level-builders
Type-level builders
2 parents 7554fd8 + 3ace7bb commit f1bbca0

File tree

8 files changed

+201
-105
lines changed

8 files changed

+201
-105
lines changed

build.sbt

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import Dependencies._
22
import Settings._
33

4+
scalaVersion in ThisBuild := "2.13.2"
5+
46
lazy val `neutron-core` = (project in file("core"))
57
.enablePlugins(AutomateHeaderPlugin)
68
.settings(commonSettings)

core/src/it/scala/cr/pulsar/PulsarSpec.scala

+22-6
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,27 @@ import java.util.UUID
2626

2727
class PulsarSpec extends PulsarSuite {
2828

29-
val sub = (s: String) => Subscription(Subscription.Name(s)).withType(Subscription.Type.Failover)
30-
val topic = (s: String) => Topic(Topic.Name(s), cfg)
29+
val sub = (s: String) =>
30+
Subscription.Builder
31+
.withName(Subscription.Name(s))
32+
.withType(Subscription.Type.Failover)
33+
.build
34+
35+
val topic = (s: String) =>
36+
Topic.Builder
37+
.withName(Topic.Name(s))
38+
.withConfig(cfg)
39+
.build
40+
3141
val batch = Producer.Batching.Disabled
3242
val shard = (_: Event) => Producer.MessageKey.Default
3343

3444
withPulsarClient { client =>
3545
test("A message is published and consumed successfully") {
3646
val res: Resource[IO, (Consumer[IO, Event], Producer[IO, Event])] =
3747
for {
38-
consumer <- Consumer.create[IO, Event](client, topic("happy-path"), sub("happy-path"))
48+
consumer <- Consumer
49+
.create[IO, Event](client, topic("happy-path"), sub("happy-path"))
3950
producer <- Producer.create[IO, Event](client, topic("happy-path"))
4051
} yield consumer -> producer
4152

@@ -68,7 +79,8 @@ class PulsarSpec extends PulsarSuite {
6879
test("A message is published and nack'ed when consumer fails to decode it") {
6980
val res: Resource[IO, (Consumer[IO, Event], Producer[IO, String])] =
7081
for {
71-
consumer <- Consumer.create[IO, Event](client, topic("auto-nack"), sub("auto-nack"))
82+
consumer <- Consumer
83+
.create[IO, Event](client, topic("auto-nack"), sub("auto-nack"))
7284
producer <- Producer.create[IO, String](client, topic("auto-nack"))
7385
} yield consumer -> producer
7486

@@ -80,7 +92,8 @@ class PulsarSpec extends PulsarSuite {
8092
val consume =
8193
consumer.autoSubscribe
8294
.handleErrorWith {
83-
case Consumer.DecodingFailure(data) => Stream.eval(latch.complete(data))
95+
case Consumer.DecodingFailure(data) =>
96+
Stream.eval(latch.complete(data))
8497
}
8598

8699
val testMessage = "Consumer will fail to decode this message"
@@ -105,7 +118,10 @@ class PulsarSpec extends PulsarSuite {
105118
) {
106119
val makeSub =
107120
(n: String) =>
108-
Subscription(Subscription.Name(n)).withType(Subscription.Type.KeyShared)
121+
Subscription.Builder
122+
.withName(Subscription.Name(n))
123+
.withType(Subscription.Type.KeyShared)
124+
.build
109125

110126
val opts =
111127
Producer.Options[IO, Event]().withShardKey(_.shardKey).withBatching(batch)

core/src/it/scala/cr/pulsar/PulsarSuite.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ abstract class PulsarSuite extends FunSuite {
4141

4242
override def beforeAll(): Unit = {
4343
super.beforeAll()
44-
val (cli, release) =
45-
Pulsar.create[IO](cfg.serviceUrl).allocated.unsafeRunSync()
44+
val (cli, release) = Pulsar.create[IO](cfg.url).allocated.unsafeRunSync()
4645
this.client = cli
4746
this.close = release
4847
latch.complete(()).unsafeRunSync()
@@ -80,6 +79,6 @@ abstract class PulsarSuite extends FunSuite {
8079
}
8180
}
8281

83-
lazy val cfg = Config.Default
82+
lazy val cfg = Config.Builder.default
8483

8584
}

core/src/main/scala/cr/pulsar/Config.scala

+56-33
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cr.pulsar
1818

1919
import Config._
2020
import io.estatico.newtype.macros._
21+
import scala.annotation.implicitNotFound
2122

2223
/**
2324
* Basic Pulsar configuration to establish
@@ -26,47 +27,69 @@ import io.estatico.newtype.macros._
2627
sealed abstract class Config {
2728
val tenant: PulsarTenant
2829
val namespace: PulsarNamespace
29-
val serviceUrl: PulsarURL
30-
def withTenant(_tenant: PulsarTenant): Config
31-
def withNamespace(_namespace: PulsarNamespace): Config
32-
def withURL(_url: PulsarURL): Config
30+
val url: PulsarURL
3331
}
3432

3533
object Config {
3634
@newtype case class PulsarTenant(value: String)
3735
@newtype case class PulsarNamespace(value: String)
3836
@newtype case class PulsarURL(value: String)
3937

40-
private case class ConfigImpl(
41-
tenant: PulsarTenant,
42-
namespace: PulsarNamespace,
43-
serviceUrl: PulsarURL
44-
) extends Config {
45-
def withTenant(_tenant: PulsarTenant): Config =
46-
copy(tenant = _tenant)
47-
def withNamespace(_namespace: PulsarNamespace): Config =
48-
copy(namespace = _namespace)
49-
def withURL(_serviceUrl: PulsarURL): Config =
50-
copy(serviceUrl = _serviceUrl)
38+
/**************** Type-level builder ******************/
39+
sealed trait Info
40+
object Info {
41+
sealed trait Empty extends Info
42+
sealed trait Namespace extends Info
43+
sealed trait Tenant extends Info
44+
sealed trait URL extends Info
45+
46+
type Mandatory = Empty with Namespace with Tenant with URL
47+
}
48+
49+
case class ConfigBuilder[I <: Info] protected (
50+
_tenant: PulsarTenant = PulsarTenant(""),
51+
_namespace: PulsarNamespace = PulsarNamespace(""),
52+
_url: PulsarURL = PulsarURL("")
53+
) {
54+
def withTenant(tenant: PulsarTenant): ConfigBuilder[I with Info.Tenant] =
55+
this.copy(_tenant = tenant)
56+
57+
def withNameSpace(namespace: PulsarNamespace): ConfigBuilder[I with Info.Namespace] =
58+
this.copy(_namespace = namespace)
59+
60+
def withURL(url: PulsarURL): ConfigBuilder[I with Info.URL] =
61+
this.copy(_url = url)
62+
63+
/**
64+
* It creates a new configuration.
65+
*/
66+
def build(
67+
implicit @implicitNotFound(
68+
"Tenant, Namespace and URL are mandatory. To create a default configuration, use Config.Builder.default instead."
69+
) ev: I =:= Info.Mandatory
70+
): Config =
71+
new Config {
72+
val tenant = _tenant
73+
val namespace = _namespace
74+
val url = _url
75+
}
76+
77+
/**
78+
* It creates a new configuration with the following default values:
79+
*
80+
* - tenant: "public"
81+
* - namespace: "default"
82+
* - url: "pulsar://localhost:6650"
83+
*/
84+
def default: Config =
85+
Config.Builder
86+
.withTenant(PulsarTenant("public"))
87+
.withNameSpace(PulsarNamespace("default"))
88+
.withURL(PulsarURL("pulsar://localhost:6650"))
89+
.build
90+
5191
}
5292

53-
def apply(
54-
tenant: PulsarTenant,
55-
namespace: PulsarNamespace,
56-
serviceUrl: PulsarURL
57-
): Config = ConfigImpl(tenant, namespace, serviceUrl)
93+
object Builder extends ConfigBuilder[Info.Empty]()
5894

59-
/**
60-
* It creates a default configuration.
61-
*
62-
* - tenant: "public"
63-
* - namespace: "default"
64-
* - url: "pulsar://localhost:6650"
65-
*/
66-
def Default: Config =
67-
ConfigImpl(
68-
PulsarTenant("public"),
69-
PulsarNamespace("default"),
70-
PulsarURL("pulsar://localhost:6650")
71-
)
7295
}

core/src/main/scala/cr/pulsar/Subscription.scala

+44-21
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@ package cr.pulsar
1818

1919
import io.estatico.newtype.macros.newtype
2020
import org.apache.pulsar.client.api.{ SubscriptionMode, SubscriptionType }
21+
import scala.annotation.implicitNotFound
2122

22-
// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
2323
sealed abstract class Subscription {
2424
val name: Subscription.Name
2525
val `type`: Subscription.Type
2626
val mode: Subscription.Mode
27-
def withType(_type: Subscription.Type): Subscription
28-
def withMode(_mode: Subscription.Mode): Subscription
2927
}
3028

3129
/**
@@ -73,25 +71,50 @@ object Subscription {
7371
}
7472
}
7573

76-
private case class SubscriptionImpl(
77-
name: Subscription.Name,
78-
`type`: Subscription.Type,
79-
mode: Subscription.Mode
80-
) extends Subscription {
81-
def withType(_type: Subscription.Type): Subscription =
82-
copy(`type` = _type)
83-
def withMode(_mode: Subscription.Mode): Subscription =
84-
copy(mode = _mode)
74+
/**************** Type-level builder ******************/
75+
sealed trait Info
76+
object Info {
77+
sealed trait Empty extends Info
78+
sealed trait Name extends Info
79+
sealed trait Mode extends Info
80+
sealed trait Type extends Info
81+
82+
type Mandatory = Empty with Name with Mode with Type
83+
}
84+
85+
case class SubscriptionBuilder[I <: Info] protected (
86+
_name: Name = Name(""),
87+
_type: Type = Type.Exclusive,
88+
_mode: Mode = Mode.Durable
89+
) {
90+
def withName(name: Name): SubscriptionBuilder[I with Info.Name] =
91+
this.copy(_name = Name(s"${name.value}-subscription"))
92+
93+
def withMode(mode: Mode): SubscriptionBuilder[I with Info.Mode] =
94+
this.copy(_mode = mode)
95+
96+
def withType(typ: Type): SubscriptionBuilder[I with Info.Type] =
97+
this.copy(_type = typ)
98+
99+
/**
100+
* It creates a subscription with default configuration.
101+
*
102+
* - type: Exclusive
103+
* - mode: Durable
104+
*/
105+
def build(
106+
implicit @implicitNotFound(
107+
"Subscription.Name is mandatory. By default Type=Exclusive and Mode=Durable."
108+
) ev: I =:= Info.Mandatory
109+
): Subscription =
110+
new Subscription {
111+
val name = _name
112+
val `type` = _type
113+
val mode = _mode
114+
}
115+
85116
}
86117

87-
/**
88-
* It creates a subscription with default configuration.
89-
*
90-
* - type: Exclusive
91-
* - mode: Durable
92-
*/
93-
def apply(name: Name): Subscription =
94-
// Same as Java's defaults: https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L62
95-
SubscriptionImpl(Name(s"${name.value}-subscription"), Type.Exclusive, Mode.Durable)
118+
object Builder extends SubscriptionBuilder[Info.Empty with Info.Type with Info.Mode]()
96119

97120
}

0 commit comments

Comments
 (0)