Skip to content

Commit

Permalink
Merge pull request #52 from HernanRivasAcosta/hernan.removed_lager
Browse files Browse the repository at this point in the history
Removed Lager
  • Loading branch information
HernanRivasAcosta committed Aug 8, 2021
2 parents a9f51ec + 47f72d5 commit f1ccd82
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 73 deletions.
6 changes: 2 additions & 4 deletions Emakefile
@@ -1,5 +1,4 @@
{"src/*", [{parse_transform, lager_transform},
warn_unused_vars,
{"src/*", [warn_unused_vars,
warn_export_all,
warn_shadow_vars,
warn_unused_import,
Expand All @@ -15,8 +14,7 @@
debug_info,
{outdir, "/ebin"},
{i, "include"}]}.
{"test/*", [{parse_transform, lager_transform},
warn_unused_vars,
{"test/*", [warn_unused_vars,
warn_export_all,
warn_shadow_vars,
warn_unused_import,
Expand Down
8 changes: 2 additions & 6 deletions README.md
@@ -1,11 +1,9 @@
kafkerl v2.1.0
==============
[![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/HernanRivasAcosta/kafkerl?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

Apache Kafka 0.8.2 high performance producer/consumer for erlang.
Developed thanks to the support and sponsorship of [TigerText](http://www.tigertext.com/) and [Inaka](https://github.com/inaka/).

##Features (aka, why kafkerl?)
## Features (aka, why kafkerl?)
- Fast binary creation.
- Caching requests to build more optimally compressed multi message TCP packages.
- Highly concurrent, using @jaynel concurrency tools.
Expand All @@ -14,10 +12,8 @@ Developed thanks to the support and sponsorship of [TigerText](http://www.tigert
- Flexible API allows consumer of messages to define pids, funs or M:F pairs as callbacks for the received messages.
- Simple yet flexible consumer API to retrieve the messages from Kafka.

##Missing features (aka, what I am working on but haven't finished yet)
## Missing features (aka, what I am working on but haven't finished yet)
- There is no communication with Zookeeper.
- Tests suites.



Special thanks to [@nitzanharel](https://github.com/nitzanharel) who found some really nasty bugs and helped me understand the subtleties of kafka's design and to the rest of the [TigerText](http://www.tigertext.com/) and [Inaka](https://github.com/inaka/) teams for their support and code reviews.
4 changes: 1 addition & 3 deletions config/sys.config
@@ -1,6 +1,4 @@
[{lager, [{colored, true},
{handlers, [{lager_console_backend, [debug,true]}]}]},
{kafkerl, [%{gen_server_name, kafkerl_client},
[{kafkerl, [%{gen_server_name, kafkerl_client},
{disabled, false},
{conn_config, [{brokers, [{"localhost", 9090},
{"localhost", 9091},
Expand Down
9 changes: 3 additions & 6 deletions rebar.config
Expand Up @@ -4,8 +4,7 @@
%% == Erlang Compiler ==

%% Erlang compiler options
{erl_opts, [ {parse_transform, lager_transform} %%
, warn_unused_vars
{erl_opts, [ warn_unused_vars
, warnings_as_errors %
, warn_export_all
, warn_shadow_vars
Expand Down Expand Up @@ -67,10 +66,8 @@

%% == Dependencies ==

{deps, [ {parse_trans, "3.0.0"}
, {lager, "3.2.1"}
, {epocxy, "1.1.0"}
, {validerl, "2.0.1"}
{deps, [ {epocxy, {git, "https://github.com/duomark/epocxy", {tag, "1.1.0"}}}
, {validerl, {git, "https://github.com/HernanRivasAcosta/validerl", {branch, "master"}}}
]}.

%% == Dialyzer ==
Expand Down
17 changes: 12 additions & 5 deletions rebar.lock
@@ -1,5 +1,12 @@
[{<<"epocxy">>,{pkg,<<"epocxy">>,<<"1.1.0">>},0},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.8">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.2.1">>},0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.0.0">>},0},
{<<"validerl">>,{pkg,<<"validerl">>,<<"2.0.1">>},0}].
[{<<"epocxy">>,
{git,"https://github.com/duomark/epocxy",
{ref,"665d79dfdb78df036bac3b3cb6db55e78406880e"}},
0},
{<<"proper">>,
{git,"https://github.com/manopapad/proper",
{ref,"9f6a6501430479bed66d08cd795cd34d36ec83aa"}},
1},
{<<"validerl">>,
{git,"https://github.com/HernanRivasAcosta/validerl",
{ref,"c2bb4c3ce83ce01a8004afe01bf226d052d8a5dd"}},
0}].
2 changes: 1 addition & 1 deletion relx.config
@@ -1,3 +1,3 @@
{release, {kafkerl, "2.1.0"}, [kafkerl]}.
{release, {kafkerl, "2.1.1"}, [kafkerl]}.
{sys_config, "config/sys.config"}.
{extended_start_script, true}.
5 changes: 2 additions & 3 deletions src/kafkerl.app.src
@@ -1,8 +1,8 @@
{ application
, kafkerl
, [ {description, "Apache Kafka 0.8.2 high performance producer/consumer for erlang."}
, {vsn, "2.1.0"}
, {applications, [kernel, validerl, epocxy, lager, stdlib]}
, {vsn, "2.1.1"}
, {applications, [kernel, validerl, epocxy, stdlib]}
, {mod, {kafkerl_app, []}}
, {modules, [ kafkerl
, kafkerl_app
Expand All @@ -16,6 +16,5 @@
, {licenses, ["Apache Licence 2.0"]}
, {links, [{"Github", "https://github.com/hernanrivasacosta/kafkerl"}]}
, {build_tools, ["rebar3"]}
, {lager, [{handlers, [{lager_console_backend, info}]}]}
]
}.
40 changes: 21 additions & 19 deletions src/kafkerl_broker_connection.erl
Expand Up @@ -3,6 +3,8 @@

-behaviour(gen_server).

-include_lib("kernel/include/logger.hrl").

%% API
-export([add_buffer/2, clear_buffers/1, fetch/4, stop_fetch/3]).
% Only for internal use
Expand Down Expand Up @@ -103,7 +105,7 @@ handle_info(connection_timeout, State) ->
{stop, {error, unable_to_connect}, State};
handle_info({tcp_closed, _Socket}, State = #state{name = Name,
address = {Host, Port}}) ->
_ = lager:warning("~p lost connection to ~p:~p", [Name, Host, Port]),
ok = ?LOG_WARNING("~p lost connection to ~p:~p", [Name, Host, Port]),
NewState = handle_tcp_close(State),
{noreply, NewState};
handle_info({tcp, _Socket, Bin}, State) ->
Expand All @@ -115,7 +117,7 @@ handle_info({flush, Time}, State) ->
{ok, _Tref} = queue_flush(Time),
handle_flush(State);
handle_info(Msg, State = #state{name = Name}) ->
_ = lager:notice("~p got unexpected info message: ~p on ~p", [Name, Msg]),
ok = ?LOG_NOTICE("~p got unexpected info message: ~p on ~p", [Name, Msg]),
{noreply, State}.

% Boilerplate
Expand Down Expand Up @@ -152,7 +154,7 @@ init([Id, Connector, Address, Config, Name]) ->
{ok, State};
{errors, Errors} ->
ok = lists:foreach(fun(E) ->
_ = lager:critical("configuration error: ~p", [E])
ok = ?LOG_CRITICAL("configuration error: ~p", [E])
end, Errors),
{stop, bad_config}
end.
Expand All @@ -175,17 +177,17 @@ handle_flush(State = #state{socket = Socket, ets = EtsName, buffers = Buffers,
ClientId,
CorrelationId),
true = ets:insert_new(EtsName, {CorrelationId, MergedMessages}),
_ = lager:debug("~p sending ~p", [Name, Request]),
ok = ?LOG_DEBUG("~p sending ~p", [Name, Request]),
case gen_tcp:send(Socket, Request) of
{error, Reason} ->
_ = lager:critical("~p was unable to write to socket, reason: ~p",
ok = ?LOG_CRITICAL("~p was unable to write to socket, reason: ~p",
[Name, Reason]),
gen_tcp:close(Socket),
ets:delete_all_objects(EtsName),
ok = resend_messages(MergedMessages, Connector),
{noreply, handle_tcp_close(NewState)};
ok ->
_ = lager:debug("~p sent message ~p", [Name, CorrelationId]),
ok = ?LOG_DEBUG("~p sent message ~p", [Name, CorrelationId]),
{noreply, NewState}
end
end.
Expand All @@ -200,7 +202,7 @@ handle_fetch(ServerRef, Topic, Partition, Options,
Scheduled} of
% An scheduled fetch we can't identify? We ignore it
{_, false, true} ->
lager:warning("ignoring unknown scheduled fetch"),
?LOG_WARNING("ignoring unknown scheduled fetch"),
{reply, ok, State};
% We are already fetching that topic/partition pair
{#fetch{}, _, false} ->
Expand All @@ -223,12 +225,12 @@ handle_fetch(ServerRef, Topic, Partition, Options,
MinBytes),
case gen_tcp:send(Socket, Payload) of
{error, Reason} ->
_ = lager:critical("~p was unable to write to socket, reason: ~p",
ok = ?LOG_CRITICAL("~p was unable to write to socket, reason: ~p",
[Name, Reason]),
ok = gen_tcp:close(Socket),
{reply, {error, no_connection}, handle_tcp_close(State)};
ok ->
_ = lager:debug("~p sent request ~p", [Name, CorrelationId]),
ok = ?LOG_DEBUG("~p sent request ~p", [Name, CorrelationId]),
NewFetch = #fetch{correlation_id = CorrelationId,
server_ref = ServerRef,
topic = Topic,
Expand Down Expand Up @@ -366,12 +368,12 @@ handle_produce_response(Bin, State = #state{connector = Connector, name = Name,
{ok, State}
end;
_ ->
_ = lager:warning("~p was unable to get produce response", [Name]),
ok = ?LOG_WARNING("~p was unable to get produce response", [Name]),
{error, invalid_produce_response}
end
catch
_:Er ->
_ = lager:critical("~p got unexpected response when parsing message: ~p",
ok = ?LOG_CRITICAL("~p got unexpected response when parsing message: ~p",
[Name, Er]),
{ok, State}
end.
Expand Down Expand Up @@ -441,7 +443,7 @@ handle_error({Topic, Partition, Error}, Messages, Name)
end;
handle_error({Topic, Partition, Error}, _Messages, Name) ->
ErrorName = kafkerl_error:get_error_name(Error),
_ = lager:error("~p was unable to handle ~p error on topic ~p, partition ~p",
ok = ?LOG_ERROR("~p was unable to handle ~p error on topic ~p, partition ~p",
[Name, ErrorName, Topic, Partition]),
false.

Expand All @@ -461,8 +463,8 @@ get_message_for_error(Topic, Partition, SavedMessages, Name) ->
end.

print_error(Name, Topic, Partition) ->
_ = lager:error("~p found no messages for topic ~p, partition ~p",
[Name, Topic, Partition]).
ok = ?LOG_ERROR("~p found no messages for topic ~p, partition ~p",
[Name, Topic, Partition]).

-spec connect(pid(),
atom(),
Expand All @@ -471,18 +473,18 @@ print_error(Name, Topic, Partition) ->
any(),
any()) -> any().
connect(Pid, Name, _TCPOpts, {Host, Port} = _Address, _Timeout, 0) ->
_ = lager:error("~p was unable to connect to ~p:~p", [Name, Host, Port]),
ok = ?LOG_ERROR("~p was unable to connect to ~p:~p", [Name, Host, Port]),
Pid ! connection_timeout;
connect(Pid, Name, TCPOpts, {Host, Port} = Address, Timeout, Retries) ->
_ = lager:debug("~p attempting connection to ~p:~p", [Name, Host, Port]),
ok = ?LOG_DEBUG("~p attempting connection to ~p:~p", [Name, Host, Port]),
case gen_tcp:connect(Host, Port, TCPOpts, 5000) of
{ok, Socket} ->
_ = lager:debug("~p connnected to ~p:~p", [Name, Host, Port]),
ok = ?LOG_DEBUG("~p connnected to ~p:~p", [Name, Host, Port]),
ok = gen_tcp:controlling_process(Socket, Pid),
Pid ! {connected, Socket};
{error, Reason} ->
NewRetries = Retries - 1,
_ = lager:warning("~p unable to connect to ~p:~p. Reason: ~p
ok = ?LOG_WARNING("~p unable to connect to ~p:~p. Reason: ~p
(~p retries left)",
[Name, Host, Port, Reason, NewRetries]),
timer:sleep(Timeout),
Expand All @@ -507,7 +509,7 @@ get_messages_from(Ets, Retries) ->
_Error when Retries > 0 ->
get_messages_from(Ets, Retries - 1);
_Error ->
_ = lager:warning("giving up on reading from the ETS buffer"),
ok = ?LOG_WARNING("giving up on reading from the ETS buffer"),
[]
end.

Expand Down

0 comments on commit f1ccd82

Please sign in to comment.