diff --git a/ruby_event_store/lib/ruby_event_store/client.rb b/ruby_event_store/lib/ruby_event_store/client.rb index 8ed4786992..edc69d1e0f 100644 --- a/ruby_event_store/lib/ruby_event_store/client.rb +++ b/ruby_event_store/lib/ruby_event_store/client.rb @@ -1,27 +1,27 @@ # frozen_string_literal: true -require 'concurrent' +require "concurrent" module RubyEventStore class Client - def initialize(repository:, - mapper: Mappers::Default.new, - subscriptions: Subscriptions.new, - dispatcher: Dispatcher.new, - clock: default_clock, - correlation_id_generator: default_correlation_id_generator) - - - @repository = repository - @mapper = mapper - @subscriptions = subscriptions - @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher) - @clock = clock - @metadata = Concurrent::ThreadLocalVar.new + def initialize( + repository:, + mapper: Mappers::Default.new, + subscriptions: Subscriptions.new, + dispatcher: Dispatcher.new, + clock: default_clock, + correlation_id_generator: default_correlation_id_generator + ) + @repository = repository + @mapper = mapper + @subscriptions = subscriptions + @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher) + @clock = clock + @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator + @event_type_resolver = subscriptions.event_type_resolver end - # Persists events and notifies subscribed handlers about them # # @param events [Array, Event] event(s) @@ -30,13 +30,10 @@ def initialize(repository:, # @return [self] def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = enrich_events_metadata(events) - records = transform(enriched_events) + records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| - with_metadata( - correlation_id: event.metadata.fetch(:correlation_id), - causation_id: event.event_id, - ) do + with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do broker.(event, record) end end @@ -51,7 +48,7 @@ def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform(enrich_events_metadata(events)), stream_name: stream_name, - expected_version: expected_version + expected_version: expected_version, ) self end @@ -109,7 +106,7 @@ def streams_of(event_id) def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc - broker.add_subscription(subscriber, to) + broker.add_subscription(subscriber, to.map(&event_type_resolver)) end # Subscribes a handler (subscriber) that will be invoked for all published events @@ -139,11 +136,12 @@ def subscribers_for(event_class) # which are active only during the invocation of the provided # block of code. class Within - def initialize(block, broker) + def initialize(block, broker, resolver) @block = block @broker = broker @global_subscribers = [] - @subscribers = Hash.new {[]} + @subscribers = Hash.new { [] } + @resolver = resolver end # Subscribes temporary handlers that @@ -175,9 +173,9 @@ def subscribe_to_all_events(*handlers, &handler2) # @param to [Array] types of events to subscribe # @param handler [Proc] handler passed as proc # @return [self] - def subscribe(handler=nil, to:, &handler2) + def subscribe(handler = nil, to:, &handler2) raise ArgumentError if handler && handler2 - @subscribers[handler || handler2] += Array(to) + @subscribers[handler || handler2] += Array(to).map(&resolver) self end @@ -187,7 +185,7 @@ def subscribe(handler=nil, to:, &handler2) # # @return [Object] value returned by the invoked block of code def call - unsubs = add_thread_global_subscribers + unsubs = add_thread_global_subscribers unsubs += add_thread_subscribers @block.call ensure @@ -196,16 +194,14 @@ def call private + attr_reader :resolver + def add_thread_subscribers - @subscribers.map do |subscriber, types| - @broker.add_thread_subscription(subscriber, types) - end + @subscribers.map { |subscriber, types| @broker.add_thread_subscription(subscriber, types) } end def add_thread_global_subscribers - @global_subscribers.map do |subscriber| - @broker.add_thread_global_subscription(subscriber) - end + @global_subscribers.map { |subscriber| @broker.add_thread_global_subscription(subscriber) } end end @@ -216,7 +212,7 @@ def add_thread_global_subscribers # @return [Within] builder object which collects temporary subscriptions def within(&block) raise ArgumentError if block.nil? - Within.new(block, broker) + Within.new(block, broker, event_type_resolver) end # Set additional metadata for all events published within the provided block @@ -238,19 +234,17 @@ def with_metadata(metadata, &block) # # @return [Event] deserialized event def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) - extract_timestamp = lambda do |m| - (m[:timestamp] || Time.parse(m.fetch('timestamp'))).iso8601 - end + extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } mapper.record_to_event( SerializedRecord.new( event_type: event_type, - event_id: event_id, - data: data, - metadata: metadata, - timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)], - valid_at: valid_at || timestamp_, - ).deserialize(serializer) + event_id: event_id, + data: data, + metadata: metadata, + timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)], + valid_at: valid_at || timestamp_, + ).deserialize(serializer), ) end @@ -311,14 +305,14 @@ def transform(events) def enrich_events_metadata(events) events = Array(events) - events.each{|event| enrich_event_metadata(event) } + events.each { |event| enrich_event_metadata(event) } events end def enrich_event_metadata(event) metadata.each { |key, value| event.metadata[key] ||= value } - event.metadata[:timestamp] ||= clock.call - event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp) + event.metadata[:timestamp] ||= clock.call + event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp) event.metadata[:correlation_id] ||= correlation_id_generator.call end @@ -328,22 +322,18 @@ def append_records_to_stream(records, stream_name:, expected_version:) protected - def event_type_resolver - subscriptions.event_type_resolver - end - def metadata=(value) @metadata.value = value end def default_clock - ->{ Time.now.utc.round(TIMESTAMP_PRECISION) } + -> { Time.now.utc.round(TIMESTAMP_PRECISION) } end def default_correlation_id_generator - ->{ SecureRandom.uuid } + -> { SecureRandom.uuid } end - attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator + attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator, :event_type_resolver end end diff --git a/ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb b/ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb index 5abf3a2591..2122d0a794 100644 --- a/ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb +++ b/ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb @@ -17,109 +17,110 @@ def call(event) subject(:subscriptions) { subscriptions_class.new } - it 'returns all subscribed handlers' do - handler = TestHandler.new + it "returns all subscribed handlers" do + handler = TestHandler.new another_handler = TestHandler.new - global_handler = TestHandler.new + global_handler = TestHandler.new - subscriptions.add_subscription(handler, [Test1DomainEvent, Test3DomainEvent]) - subscriptions.add_subscription(another_handler, [Test2DomainEvent]) + subscriptions.add_subscription(handler, %w[Test1DomainEvent Test3DomainEvent]) + subscriptions.add_subscription(another_handler, ["Test2DomainEvent"]) subscriptions.add_global_subscription(global_handler) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, global_handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, global_handler]) - expect(subscriptions.all_for('Test3DomainEvent')).to eq([handler, global_handler]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, global_handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([another_handler, global_handler]) + expect(subscriptions.all_for("Test3DomainEvent")).to eq([handler, global_handler]) end - it 'returns subscribed thread handlers' do - handler = TestHandler.new + it "returns subscribed thread handlers" do + handler = TestHandler.new another_handler = TestHandler.new - global_handler = TestHandler.new + global_handler = TestHandler.new - subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test3DomainEvent]) - subscriptions.add_thread_subscription(another_handler, [Test2DomainEvent]) + subscriptions.add_thread_subscription(handler, %w[Test1DomainEvent Test3DomainEvent]) + subscriptions.add_thread_subscription(another_handler, ["Test2DomainEvent"]) subscriptions.add_thread_global_subscription(global_handler) - t = Thread.new do - subscriptions.add_thread_subscription(handler, [Test2DomainEvent]) - subscriptions.add_thread_global_subscription(another_handler) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, handler]) - end - - expect(subscriptions.all_for('Test1DomainEvent')).to eq([global_handler, handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([global_handler, another_handler]) - expect(subscriptions.all_for('Test3DomainEvent')).to eq([global_handler, handler]) + t = + Thread.new do + subscriptions.add_thread_subscription(handler, ["Test2DomainEvent"]) + subscriptions.add_thread_global_subscription(another_handler) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([another_handler, handler]) + end + + expect(subscriptions.all_for("Test1DomainEvent")).to eq([global_handler, handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([global_handler, another_handler]) + expect(subscriptions.all_for("Test3DomainEvent")).to eq([global_handler, handler]) t.join end - it 'returns lambda as an output of global subscribe methods' do - handler = TestHandler.new + it "returns lambda as an output of global subscribe methods" do + handler = TestHandler.new result = subscriptions.add_global_subscription(handler) expect(result).to respond_to(:call) end - it 'returns lambda as an output of subscribe methods' do - handler = TestHandler.new - result = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent]) + it "returns lambda as an output of subscribe methods" do + handler = TestHandler.new + result = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent]) expect(result).to respond_to(:call) end - it 'revokes global subscription' do - handler = TestHandler.new + it "revokes global subscription" do + handler = TestHandler.new - revoke = subscriptions.add_global_subscription(handler) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler]) + revoke = subscriptions.add_global_subscription(handler) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler]) revoke.() - expect(subscriptions.all_for('Test1DomainEvent')).to eq([]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([]) end - it 'revokes subscription' do - handler = TestHandler.new + it "revokes subscription" do + handler = TestHandler.new - revoke = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent]) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler]) + revoke = subscriptions.add_subscription(handler, %w[Test1DomainEvent Test2DomainEvent]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler]) revoke.() - expect(subscriptions.all_for('Test1DomainEvent')).to eq([]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([]) end - it 'revokes thread global subscription' do - handler = TestHandler.new + it "revokes thread global subscription" do + handler = TestHandler.new - revoke = subscriptions.add_thread_global_subscription(handler) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler]) + revoke = subscriptions.add_thread_global_subscription(handler) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler]) revoke.() - expect(subscriptions.all_for('Test1DomainEvent')).to eq([]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([]) end - it 'revokes thread subscription' do - handler = TestHandler.new + it "revokes thread subscription" do + handler = TestHandler.new - revoke = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent]) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler]) + revoke = subscriptions.add_thread_subscription(handler, %w[Test1DomainEvent Test2DomainEvent]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([handler]) revoke.() - expect(subscriptions.all_for('Test1DomainEvent')).to eq([]) - expect(subscriptions.all_for('Test2DomainEvent')).to eq([]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([]) + expect(subscriptions.all_for("Test2DomainEvent")).to eq([]) end - it 'subscribes by type of event which is a String' do - handler = TestHandler.new + it "subscribes by type of event which is a String" do + handler = TestHandler.new subscriptions.add_subscription(handler, ["Test1DomainEvent"]) subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"]) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, handler]) end - it 'subscribes by type of event which is a class' do - handler = TestHandler.new - subscriptions.add_subscription(handler, [Test1DomainEvent]) - subscriptions.add_thread_subscription(handler, [Test1DomainEvent]) + it "subscribes by type of event which is a class" do + handler = TestHandler.new + subscriptions.add_subscription(handler, ["Test1DomainEvent"]) + subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"]) - expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler]) + expect(subscriptions.all_for("Test1DomainEvent")).to eq([handler, handler]) end end diff --git a/ruby_event_store/lib/ruby_event_store/subscriptions.rb b/ruby_event_store/lib/ruby_event_store/subscriptions.rb index 7902eb02c0..3870c925dd 100644 --- a/ruby_event_store/lib/ruby_event_store/subscriptions.rb +++ b/ruby_event_store/lib/ruby_event_store/subscriptions.rb @@ -1,18 +1,18 @@ # frozen_string_literal: true -require 'concurrent' +require "concurrent" module RubyEventStore class Subscriptions def initialize(event_type_resolver: default_event_type_resolver) @event_type_resolver = event_type_resolver - @local = LocalSubscriptions.new + @local = LocalSubscriptions.new @global = GlobalSubscriptions.new @thread = ThreadSubscriptions.new end def add_subscription(subscriber, event_types) - local.add(subscriber, resolve_event_types(event_types)) + local.add(subscriber, event_types) end def add_global_subscription(subscriber) @@ -20,7 +20,7 @@ def add_global_subscription(subscriber) end def add_thread_subscription(subscriber, event_types) - thread.local.add(subscriber, resolve_event_types(event_types)) + thread.local.add(subscriber, event_types) end def add_thread_global_subscription(subscriber) @@ -28,46 +28,39 @@ def add_thread_global_subscription(subscriber) end def all_for(event_type) - [local, global, thread].map{|r| r.all_for(event_type)}.reduce(&:+) + [local, global, thread].map { |r| r.all_for(event_type) }.reduce(&:+) end attr_reader :event_type_resolver private + attr_reader :local, :global, :thread def default_event_type_resolver ->(value) { value.to_s } end - def resolve_event_types(event_types) - event_types.map(&method(:resolve_event_type)) - end - - def resolve_event_type(type) - @event_type_resolver.call(type) - end - class ThreadSubscriptions def initialize - @local = ThreadLocalSubscriptions.new + @local = ThreadLocalSubscriptions.new @global = ThreadGlobalSubscriptions.new end attr_reader :local, :global def all_for(event_type) - [global, local].map{|r| r.all_for(event_type)}.reduce(&:+) + [global, local].map { |r| r.all_for(event_type) }.reduce(&:+) end end class LocalSubscriptions def initialize - @subscriptions = Hash.new {|hsh, key| hsh[key] = [] } + @subscriptions = Hash.new { |hsh, key| hsh[key] = [] } end def add(subscription, event_types) - event_types.each{ |type| @subscriptions[type] << subscription } - ->() {event_types.each{ |type| @subscriptions.fetch(type).delete(subscription) } } + event_types.each { |type| @subscriptions[type] << subscription } + -> { event_types.each { |type| @subscriptions.fetch(type).delete(subscription) } } end def all_for(event_type) @@ -82,7 +75,7 @@ def initialize def add(subscription) @subscriptions << subscription - ->() { @subscriptions.delete(subscription) } + -> { @subscriptions.delete(subscription) } end def all_for(_event_type) @@ -92,14 +85,12 @@ def all_for(_event_type) class ThreadLocalSubscriptions def initialize - @subscriptions = Concurrent::ThreadLocalVar.new do - Hash.new {|hsh, key| hsh[key] = [] } - end + @subscriptions = Concurrent::ThreadLocalVar.new { Hash.new { |hsh, key| hsh[key] = [] } } end def add(subscription, event_types) - event_types.each{ |type| @subscriptions.value[type] << subscription } - ->() {event_types.each{ |type| @subscriptions.value.fetch(type).delete(subscription) } } + event_types.each { |type| @subscriptions.value[type] << subscription } + -> { event_types.each { |type| @subscriptions.value.fetch(type).delete(subscription) } } end def all_for(event_type) @@ -114,7 +105,7 @@ def initialize def add(subscription) @subscriptions.value += [subscription] - ->() { @subscriptions.value -= [subscription] } + -> { @subscriptions.value -= [subscription] } end def all_for(_event_type)