A Kafka Akka Extension - abstracts Kafka publisher and consumer APIs. Supports cluster data distribution and decoupling in the steam
object NodeA extends App {
val system: ActorSystem = ...//manage lifecycle..
}
val kafka = Kafka(system)
Basic source
kafka log data
Basic sink (consumers)
val receivers = List(subscriberA, subscriberB)
kafka.sink(topics, receivers)
Simply send data to Kafka based on application load-time configuration (from deploy env/config) which populates the default producer:
Where data is a simple json String and to is a topic:
kafka.log(data, to)
Using the SourceEvent directly, again with the default source. The SourceEvent provides meta-data on the event type hint, a nonce from an MD5 hash and the UTC timestamp of ingestion/creation. This allows traceability throughout the system. However, the meta-data is not yet serialized to Kafka - Roadmap.
val etype = classOf[YourEventType]
val data: Array[Byte] = yourDataBytes
import com.tuplejump.continuum.ClusterProtocol._
val data = SourceEvent(etype, data, to)
kafka log data
val source = kafka.source
val additional = Map(...configs..)
val source = kafka.source(additional)
def inboundStream[A: ClassTag](data: A): Unit = {
val bytes = data.toBytes
source ! SourceEvent(classOf[A], data, to)
}
Start by creating your receivers:
import com.tuplejump.continuum.ClusterProtocol.SinkEvent
class SimpleSubscriberA extends Actor {
def receive: Actor.Receive = {
case SinkEvent(_, data, _, topic,_) => process(data)
}
def process(data: Array[Byte]): Unit = { ... }
}
Pass them into the new Sink, one or more:
val receivers = List(subscriberA, subscriberB)
kafka.sink(topics, receivers)
Just use for subscribing to streams:
Kafka(actorSystem).sink(topics, receivers)