Skip to content

Commit 961431d

Browse files
authored
Bulk requests via post (#12)
* Add intial support for post * Add value validations for structured event (#10) * Add emitter for bulk and lone requests * Add processor module to send events * Fix scheduler for bulk processor * Add cache module * Add placeholder files for test cases * Update travis CI version * Refactor application.ex and rename bulk.ex * Add tests for cache module * Add tests * Remove unused variables * Add try catch * Remove tab file when deleting ETS table * Move delete to cache module * Comment cache test cases * Add cache test * Add test cases for set_lock * Add tests for release_lock * Add check_lock test case * Cleanup tests * Add more tests * Fix cache issue * Add more tests * Fix failing test cases * Fix error * Add alias * Remove unused alias
1 parent 45634b2 commit 961431d

27 files changed

+628
-59
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ erl_crash.dump
2121

2222
# Ignore package tarball (built via "mix hex.build").
2323
snowplow_elixir_tracker-*.tar
24+
25+
# Ignore elixir tables created using PersistentEts
26+
*.tab

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ otp_release: '17.4'
66

77
matrix:
88
include:
9-
- elixir: '1.6'
9+
- elixir: '1.8'
1010
otp_release: '18.0'
1111
script:
1212
- MIX_ENV=test mix do compile, coveralls.json

config/config.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ config :snowplow_tracker,
3333
default_options: [
3434
timeout: 5000,
3535
recv_timeout: 2000
36-
]
36+
],
37+
table: :snowplow_events

config/test.exs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use Mix.Config
2+
3+
config :snowplow_tracker,
4+
default_options: [
5+
timeout: 5000,
6+
recv_timeout: 2000
7+
],
8+
table: :snowplow_events_test

lib/application.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
defmodule SnowplowTracker.Application do
2+
use Application
3+
4+
def start(_type, _args) do
5+
children =
6+
if Mix.env() != :test do
7+
[
8+
{SnowplowTracker.Emitters.Server, []}
9+
]
10+
else
11+
[]
12+
end
13+
14+
Supervisor.start_link(children, strategy: :one_for_all)
15+
end
16+
end

lib/snowplow_tracker/constants.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ defmodule SnowplowTracker.Constants do
1212
@post_protocol_vendor "com.snowplowanalytics.snowplow"
1313
@post_protocol_version "tp2"
1414
@post_content_type "application/json; charset=utf-8"
15+
@post_wrapper_bytes 88
16+
@post_stm_bytes 22
1517
@get_protocol_path "i"
1618
@schema_payload_data "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4"
1719
@schema_contexts "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"
@@ -85,6 +87,8 @@ defmodule SnowplowTracker.Constants do
8587
def post_protocol_vendor, do: @post_protocol_vendor
8688
def post_protocol_version, do: @post_protocol_version
8789
def post_content_type, do: @post_content_type
90+
def post_wrapper_bytes, do: @post_wrapper_bytes
91+
def post_stm_bytes, do: @post_stm_bytes
8892

8993
# GET Requests
9094
def get_protocol_path, do: @get_protocol_path

lib/snowplow_tracker/emitter.ex

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,35 @@ defmodule SnowplowTracker.Emitter do
44
snowplow collector.
55
"""
66
alias __MODULE__
7-
8-
alias SnowplowTracker.{Payload, Request, Response, Errors}
7+
alias SnowplowTracker.Payload
98
alias SnowplowTracker.Emitters.Helper
9+
alias SnowplowTracker.Emitters.Unary, as: UnaryEmitter
10+
alias SnowplowTracker.Emitters.Bulk , as: BulkEmitter
1011

1112
@keys [
1213
collector_uri: "localhost",
1314
request_type: "GET",
1415
collector_port: nil,
1516
protocol: "http"
1617
]
17-
18-
defstruct @keys
19-
2018
@type t :: %__MODULE__{
2119
collector_uri: String.t(),
2220
request_type: String.t(),
2321
collector_port: number(),
2422
protocol: String.t()
2523
}
2624

25+
defstruct @keys
26+
2727
# Public API
2828

2929
@spec new(Emitter.t()) :: Emitter.t()
3030
def new(uri), do: struct(%Emitter{}, collector_uri: uri)
3131

3232
@spec input(Payload.t(), Emitter.t(), struct()) :: {:ok, String.t()} | no_return()
33-
def input(%Payload{} = payload, %Emitter{} = emitter, module \\ Helper) do
33+
def input(payload, emitter, module \\ Helper)
34+
35+
def input(%Payload{} = payload, %Emitter{request_type: "GET"} = emitter, module) do
3436
url =
3537
module.generate_endpoint(
3638
emitter.protocol,
@@ -40,16 +42,19 @@ defmodule SnowplowTracker.Emitter do
4042
emitter.request_type
4143
)
4244

43-
with {:ok, response} <- Request.get(url, [], default_options()),
44-
{:ok, body} <- Response.parse(response) do
45-
{:ok, body}
46-
else
47-
{:error, error} ->
48-
raise Errors.ApiError, Kernel.inspect(error)
49-
end
45+
UnaryEmitter.create(payload, url)
5046
end
5147

52-
defp default_options do
53-
Application.get_env(:snowplow_tracker, :default_options) || []
48+
def input(%Payload{} = payload, %Emitter{request_type: "POST"} = emitter, module) do
49+
url =
50+
module.generate_endpoint(
51+
emitter.protocol,
52+
emitter.collector_uri,
53+
emitter.collector_port,
54+
payload,
55+
emitter.request_type
56+
)
57+
58+
BulkEmitter.create(payload, url)
5459
end
5560
end

lib/snowplow_tracker/emitters/bulk.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
defmodule SnowplowTracker.Emitters.Bulk do
2+
@moduledoc """
3+
This module defines the emitter that will buffer the events to be sent via
4+
POST requests.
5+
"""
6+
7+
alias SnowplowTracker.Payload
8+
alias SnowplowTracker.Emitters.Cache
9+
10+
@table Application.get_env(:snowplow_tracker, :table)
11+
12+
def create(payload, url, table \\ @table) do
13+
eid =
14+
payload
15+
|> Payload.get()
16+
|> Map.fetch!("eid")
17+
18+
Cache.insert({eid, payload, url}, table)
19+
{:ok, :success}
20+
end
21+
end
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
defmodule SnowplowTracker.Emitters.Cache do
2+
@moduledoc """
3+
Module which implements functions to interact with ETS
4+
"""
5+
6+
require Logger
7+
8+
alias :ets, as: Ets
9+
10+
@table Application.get_env(:snowplow_tracker, :table)
11+
@path Application.get_env(:snowplow_tracker, :path, ".")
12+
@lock "lock"
13+
14+
def init(table \\ @table) do
15+
PersistentEts.new(
16+
table,
17+
"#{@path}/#{Atom.to_string(table)}.tab",
18+
[:public, :set, :named_table, {:read_concurrency, true}]
19+
)
20+
21+
Logger.log(:debug, "Table #{table} created successfully!")
22+
{:ok, :success}
23+
end
24+
25+
def insert(payload, table \\ @table) do
26+
{
27+
:ok,
28+
Ets.insert(table, payload)
29+
}
30+
end
31+
32+
def match(table \\ @table) do
33+
{
34+
:ok,
35+
Ets.match(table, {:"$1", :"$2", :"$3"})
36+
}
37+
end
38+
39+
def check_lock(table \\ @table) do
40+
{
41+
:ok,
42+
Ets.lookup(table, @lock)
43+
}
44+
end
45+
46+
# coveralls-ignore-start
47+
def release_lock(payload, table \\ @table)
48+
# coveralls-ignore-stop
49+
50+
def release_lock({:ok, _msg}, table), do: delete_key(@lock, table)
51+
52+
def release_lock({:error, _}, _table), do: :ok
53+
54+
def set_lock(table \\ @table) do
55+
try do
56+
Ets.insert(table, {@lock, true})
57+
Logger.log(:debug, "#{__MODULE__}: Lock set successfully!")
58+
{:ok, :success}
59+
rescue
60+
_e in ArgumentError ->
61+
Logger.log(:debug, "#{__MODULE__}: Falied to set lock!")
62+
{:error, :failed}
63+
end
64+
end
65+
66+
def delete_key(key, table \\ @table) do
67+
Ets.delete(table, key)
68+
Logger.log(:debug, "Key deleted: #{key}")
69+
{:ok, :success}
70+
end
71+
72+
def delete_table(table \\ @table) do
73+
Ets.delete(table)
74+
Logger.log(:debug, "Deleted table #{table}")
75+
File.rm("#{Atom.to_string(table)}.tab")
76+
end
77+
end

lib/snowplow_tracker/emitters/helper.ex

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,51 @@ defmodule SnowplowTracker.Emitters.Helper do
44
necessary for the emitter module
55
"""
66

7-
alias SnowplowTracker.{Errors, Constants}
7+
alias SnowplowTracker.{Constants, Payload}
88

99
@get_method "GET"
10+
@post_method "POST"
11+
@get_path Constants.get_protocol_path()
12+
@post_vendor Constants.post_protocol_vendor()
13+
@post_version Constants.post_protocol_version()
1014

1115
# Public API
1216

1317
@doc """
1418
This function is used to generate the endpoint with the query parameters
1519
which are used to send events to the collector.
1620
"""
17-
def generate_endpoint(protocol, uri, nil = _port, payload, @get_method) do
18-
do_generate_endpoint(protocol, uri, "", payload)
21+
@spec generate_endpoint(String.t(), String.t(), String.t(), Payload.t(), String.t()) ::
22+
String.t()
23+
def generate_endpoint(protocol, uri, nil = _port, payload, request_method) do
24+
do_generate_endpoint(
25+
protocol,
26+
uri,
27+
"",
28+
payload,
29+
request_method
30+
)
1931
end
2032

21-
def generate_endpoint(protocol, uri, port, payload, @get_method) do
22-
do_generate_endpoint(protocol, uri, ":#{port}", payload)
23-
end
24-
25-
def generate_endpoint(_protocol, _uri, _port, _payload, invalid_method) do
26-
message = "#{invalid_method} method not implemented"
27-
raise Errors.NotImplemented, message
33+
def generate_endpoint(protocol, uri, port, payload, request_method) do
34+
do_generate_endpoint(
35+
protocol,
36+
uri,
37+
":#{port}",
38+
payload,
39+
request_method
40+
)
2841
end
2942

3043
# Private API
3144

3245
@doc false
33-
defp do_generate_endpoint(protocol, uri, port, payload) do
46+
defp do_generate_endpoint(protocol, uri, port, payload, @get_method) do
3447
params = URI.encode_query(payload)
35-
"#{protocol}://#{uri}#{port}/#{protocol_path()}?#{params}"
48+
"#{protocol}://#{uri}#{port}/#{@get_path}?#{params}"
3649
end
3750

38-
@doc false
39-
defp protocol_path do
40-
Constants.get_protocol_path()
51+
defp do_generate_endpoint(protocol, uri, port, _payload, @post_method) do
52+
"#{protocol}://#{uri}#{port}/#{@post_vendor}?#{@post_version}"
4153
end
4254
end

0 commit comments

Comments
 (0)