Skip to content

Improve usefulness of event_class_to_event_type resolver #1036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 44 additions & 54 deletions ruby_event_store/lib/ruby_event_store/client.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
# frozen_string_literal: true

require 'concurrent'
require "concurrent"

module RubyEventStore
class Client
def initialize(repository:,
mapper: Mappers::Default.new,
subscriptions: Subscriptions.new,
dispatcher: Dispatcher.new,
clock: default_clock,
correlation_id_generator: default_correlation_id_generator)


@repository = repository
@mapper = mapper
@subscriptions = subscriptions
@broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
@clock = clock
@metadata = Concurrent::ThreadLocalVar.new
def initialize(
repository:,
mapper: Mappers::Default.new,
subscriptions: Subscriptions.new,
dispatcher: Dispatcher.new,
clock: default_clock,
correlation_id_generator: default_correlation_id_generator
)
@repository = repository
@mapper = mapper
@subscriptions = subscriptions
@broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
@clock = clock
@metadata = Concurrent::ThreadLocalVar.new
@correlation_id_generator = correlation_id_generator
@event_type_resolver = subscriptions.event_type_resolver
end


# Persists events and notifies subscribed handlers about them
#
# @param events [Array<Event>, Event] event(s)
Expand All @@ -30,13 +30,10 @@ def initialize(repository:,
# @return [self]
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
enriched_events = enrich_events_metadata(events)
records = transform(enriched_events)
records = transform(enriched_events)
append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
enriched_events.zip(records) do |event, record|
with_metadata(
correlation_id: event.metadata.fetch(:correlation_id),
causation_id: event.event_id,
) do
with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do
broker.(event, record)
end
end
Expand All @@ -51,7 +48,7 @@ def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
append_records_to_stream(
transform(enrich_events_metadata(events)),
stream_name: stream_name,
expected_version: expected_version
expected_version: expected_version,
)
self
end
Expand Down Expand Up @@ -109,7 +106,7 @@ def streams_of(event_id)
def subscribe(subscriber = nil, to:, &proc)
raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
subscriber ||= proc
broker.add_subscription(subscriber, to)
broker.add_subscription(subscriber, to.map(&event_type_resolver))
end

# Subscribes a handler (subscriber) that will be invoked for all published events
Expand Down Expand Up @@ -139,11 +136,12 @@ def subscribers_for(event_class)
# which are active only during the invocation of the provided
# block of code.
class Within
def initialize(block, broker)
def initialize(block, broker, resolver)
@block = block
@broker = broker
@global_subscribers = []
@subscribers = Hash.new {[]}
@subscribers = Hash.new { [] }
@resolver = resolver
end

# Subscribes temporary handlers that
Expand Down Expand Up @@ -175,9 +173,9 @@ def subscribe_to_all_events(*handlers, &handler2)
# @param to [Array<Class>] types of events to subscribe
# @param handler [Proc] handler passed as proc
# @return [self]
def subscribe(handler=nil, to:, &handler2)
def subscribe(handler = nil, to:, &handler2)
raise ArgumentError if handler && handler2
@subscribers[handler || handler2] += Array(to)
@subscribers[handler || handler2] += Array(to).map(&resolver)
self
end

Expand All @@ -187,7 +185,7 @@ def subscribe(handler=nil, to:, &handler2)
#
# @return [Object] value returned by the invoked block of code
def call
unsubs = add_thread_global_subscribers
unsubs = add_thread_global_subscribers
unsubs += add_thread_subscribers
@block.call
ensure
Expand All @@ -196,16 +194,14 @@ def call

private

attr_reader :resolver

def add_thread_subscribers
@subscribers.map do |subscriber, types|
@broker.add_thread_subscription(subscriber, types)
end
@subscribers.map { |subscriber, types| @broker.add_thread_subscription(subscriber, types) }
end

def add_thread_global_subscribers
@global_subscribers.map do |subscriber|
@broker.add_thread_global_subscription(subscriber)
end
@global_subscribers.map { |subscriber| @broker.add_thread_global_subscription(subscriber) }
end
end

Expand All @@ -216,7 +212,7 @@ def add_thread_global_subscribers
# @return [Within] builder object which collects temporary subscriptions
def within(&block)
raise ArgumentError if block.nil?
Within.new(block, broker)
Within.new(block, broker, event_type_resolver)
end

# Set additional metadata for all events published within the provided block
Expand All @@ -238,19 +234,17 @@ def with_metadata(metadata, &block)
#
# @return [Event] deserialized event
def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
extract_timestamp = lambda do |m|
(m[:timestamp] || Time.parse(m.fetch('timestamp'))).iso8601
end
extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

mapper.record_to_event(
SerializedRecord.new(
event_type: event_type,
event_id: event_id,
data: data,
metadata: metadata,
timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)],
valid_at: valid_at || timestamp_,
).deserialize(serializer)
event_id: event_id,
data: data,
metadata: metadata,
timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)],
valid_at: valid_at || timestamp_,
).deserialize(serializer),
)
end

Expand Down Expand Up @@ -311,14 +305,14 @@ def transform(events)

def enrich_events_metadata(events)
events = Array(events)
events.each{|event| enrich_event_metadata(event) }
events.each { |event| enrich_event_metadata(event) }
events
end

def enrich_event_metadata(event)
metadata.each { |key, value| event.metadata[key] ||= value }
event.metadata[:timestamp] ||= clock.call
event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp)
event.metadata[:timestamp] ||= clock.call
event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp)
event.metadata[:correlation_id] ||= correlation_id_generator.call
end

Expand All @@ -328,22 +322,18 @@ def append_records_to_stream(records, stream_name:, expected_version:)

protected

def event_type_resolver
subscriptions.event_type_resolver
end

def metadata=(value)
@metadata.value = value
end

def default_clock
->{ Time.now.utc.round(TIMESTAMP_PRECISION) }
-> { Time.now.utc.round(TIMESTAMP_PRECISION) }
end

def default_correlation_id_generator
->{ SecureRandom.uuid }
-> { SecureRandom.uuid }
end

attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator
attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator, :event_type_resolver
end
end
127 changes: 64 additions & 63 deletions ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,109 +17,110 @@ def call(event)

subject(:subscriptions) { subscriptions_class.new }

it 'returns all subscribed handlers' do
handler = TestHandler.new
it "returns all subscribed handlers" do
handler = TestHandler.new
another_handler = TestHandler.new
global_handler = TestHandler.new
global_handler = TestHandler.new

subscriptions.add_subscription(handler, [Test1DomainEvent, Test3DomainEvent])
subscriptions.add_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_subscription(handler, %w[Test1DomainEvent Test3DomainEvent])
subscriptions.add_subscription(another_handler, ["Test2DomainEvent"])
subscriptions.add_global_subscription(global_handler)

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, global_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, global_handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([another_handler, global_handler])
expect(subscriptions.all_for("Test3DomainEvent")).to eq([handler, global_handler])
end

it 'returns subscribed thread handlers' do
handler = TestHandler.new
it "returns subscribed thread handlers" do
handler = TestHandler.new
another_handler = TestHandler.new
global_handler = TestHandler.new
global_handler = TestHandler.new

subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test3DomainEvent])
subscriptions.add_thread_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_thread_subscription(handler, %w[Test1DomainEvent Test3DomainEvent])
subscriptions.add_thread_subscription(another_handler, ["Test2DomainEvent"])
subscriptions.add_thread_global_subscription(global_handler)
t = Thread.new do
subscriptions.add_thread_subscription(handler, [Test2DomainEvent])
subscriptions.add_thread_global_subscription(another_handler)
expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, handler])
end

expect(subscriptions.all_for('Test1DomainEvent')).to eq([global_handler, handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([global_handler, another_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([global_handler, handler])
t =
Thread.new do
subscriptions.add_thread_subscription(handler, ["Test2DomainEvent"])
subscriptions.add_thread_global_subscription(another_handler)
expect(subscriptions.all_for("Test2DomainEvent")).to eq([another_handler, handler])
end

expect(subscriptions.all_for("Test1DomainEvent")).to eq([global_handler, handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([global_handler, another_handler])
expect(subscriptions.all_for("Test3DomainEvent")).to eq([global_handler, handler])
t.join
end

it 'returns lambda as an output of global subscribe methods' do
handler = TestHandler.new
it "returns lambda as an output of global subscribe methods" do
handler = TestHandler.new
result = subscriptions.add_global_subscription(handler)
expect(result).to respond_to(:call)
end

it 'returns lambda as an output of subscribe methods' do
handler = TestHandler.new
result = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
it "returns lambda as an output of subscribe methods" do
handler = TestHandler.new
result = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(result).to respond_to(:call)
end

it 'revokes global subscription' do
handler = TestHandler.new
it "revokes global subscription" do
handler = TestHandler.new

revoke = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler])
revoke.()
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([])
end

it 'revokes subscription' do
handler = TestHandler.new
it "revokes subscription" do
handler = TestHandler.new

revoke = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke = subscriptions.add_subscription(handler, %w[Test1DomainEvent Test2DomainEvent])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler])
revoke.()
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([])
end

it 'revokes thread global subscription' do
handler = TestHandler.new
it "revokes thread global subscription" do
handler = TestHandler.new

revoke = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler])
revoke.()
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([])
end

it 'revokes thread subscription' do
handler = TestHandler.new
it "revokes thread subscription" do
handler = TestHandler.new

revoke = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke = subscriptions.add_thread_subscription(handler, %w[Test1DomainEvent Test2DomainEvent])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler])
revoke.()
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([])
expect(subscriptions.all_for("Test2DomainEvent")).to eq([])
end

it 'subscribes by type of event which is a String' do
handler = TestHandler.new
it "subscribes by type of event which is a String" do
handler = TestHandler.new
subscriptions.add_subscription(handler, ["Test1DomainEvent"])
subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"])

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, handler])
end

it 'subscribes by type of event which is a class' do
handler = TestHandler.new
subscriptions.add_subscription(handler, [Test1DomainEvent])
subscriptions.add_thread_subscription(handler, [Test1DomainEvent])
it "subscribes by type of event which is a class" do
handler = TestHandler.new
subscriptions.add_subscription(handler, ["Test1DomainEvent"])
subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"])

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler])
expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, handler])
end
end
Loading
Loading