From 5d49196d11a588edaddbbac4125080676699050c Mon Sep 17 00:00:00 2001 From: Montana Mendy Date: Thu, 24 Feb 2022 04:11:40 -0800 Subject: [PATCH] Added option to convert values into binaries, with fallbacks. --- include/eredis.hrl | 174 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 153 insertions(+), 21 deletions(-) diff --git a/include/eredis.hrl b/include/eredis.hrl index e5221f15..4d93e044 100644 --- a/include/eredis.hrl +++ b/include/eredis.hrl @@ -1,31 +1,163 @@ -%% Public types +%% +%% Erlang Redis client +%% +%% Usage: +%% {ok, Client} = eredis:start_link(). +%% {ok, <<"OK">>} = eredis:q(Client, ["SET", "foo", "bar"]). +%% {ok, <<"bar">>} = eredis:q(Client, ["GET", "foo"]). --type reconnect_sleep() :: no_reconnect | integer(). +-module(eredis). +-include("eredis.hrl"). --type option() :: {host, string()} | {port, integer()} | {database, string()} | {password, string()} | {reconnect_sleep, reconnect_sleep()}. --type server_args() :: [option()]. +%% Default timeout for calls to the client gen_server +%% Specified in http://www.erlang.org/doc/man/gen_server.html#call-3 +-define(TIMEOUT, 5000). --type return_value() :: undefined | binary() | [binary()]. +-export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4, + start_link/5, start_link/6, start_link/7, stop/1, q/2, q/3, qp/2, qp/3, q_noreply/2, + q_async/2, q_async/3]). --type pipeline() :: [iolist()]. +%% Exported for testing +-export([create_multibulk/1]). --type channel() :: binary(). +%% Type of gen_server process id +-type client() :: pid() | + atom() | + {atom(),atom()} | + {global,term()} | + {via,atom(),term()}. -%% Continuation data is whatever data returned by any of the parse -%% functions. This is used to continue where we left off the next time -%% the user calls parse/2. --type continuation_data() :: any(). --type parser_state() :: status_continue | bulk_continue | multibulk_continue. +%% +%% PUBLIC API +%% -%% Internal parser state. Is returned from parse/2 and must be -%% included on the next calls to parse/2. --record(pstate, { - state = undefined :: parser_state() | undefined, - continuation_data :: continuation_data() | undefined -}). +start_link() -> + start_link("127.0.0.1", 6379, 0, ""). --define(NL, "\r\n"). +start_link(Host, Port) -> + start_link(Host, Port, 0, ""). --define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, true}]). +start_link(Host, Port, Database) -> + start_link(Host, Port, Database, ""). --define(RECV_TIMEOUT, 5000). +start_link(Host, Port, Database, Password) -> + start_link(Host, Port, Database, Password, 100). + +start_link(Host, Port, Database, Password, ReconnectSleep) -> + start_link(Host, Port, Database, Password, ReconnectSleep, ?TIMEOUT). + +start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout) -> + start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, []). + +start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions) + when is_list(Host) orelse + (is_tuple(Host) andalso tuple_size(Host) =:= 2 andalso element(1, Host) =:= local), + is_integer(Port), + is_integer(Database) orelse Database == undefined, + is_list(Password), + is_integer(ReconnectSleep) orelse ReconnectSleep =:= no_reconnect, + is_integer(ConnectTimeout), + is_list(SocketOptions) -> + + eredis_client:start_link(Host, Port, Database, Password, + ReconnectSleep, ConnectTimeout, SocketOptions). + +%% @doc: Callback for starting from poolboy +-spec start_link(server_args()) -> {ok, Pid::pid()} | {error, Reason::term()}. +start_link(Args) -> + Host = proplists:get_value(host, Args, "127.0.0.1"), + Port = proplists:get_value(port, Args, 6379), + Database = proplists:get_value(database, Args, 0), + Password = proplists:get_value(password, Args, ""), + ReconnectSleep = proplists:get_value(reconnect_sleep, Args, 100), + ConnectTimeout = proplists:get_value(connect_timeout, Args, ?TIMEOUT), + SocketOptions = proplists:get_value(socket_options, Args, []), + + start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions). + +stop(Client) -> + eredis_client:stop(Client). + +-spec q(Client::client(), Command::[any()]) -> + {ok, return_value()} | {error, Reason::binary() | no_connection}. +%% @doc: Executes the given command in the specified connection. The +%% command must be a valid Redis command and may contain arbitrary +%% data which will be converted to binaries. The returned values will +%% always be binaries. +q(Client, Command) -> + call(Client, Command, ?TIMEOUT). + +q(Client, Command, Timeout) -> + call(Client, Command, Timeout). + + +-spec qp(Client::client(), Pipeline::pipeline()) -> + [{ok, return_value()} | {error, Reason::binary()}] | + {error, no_connection}. +%% @doc: Executes the given pipeline (list of commands) in the +%% specified connection. The commands must be valid Redis commands and +%% may contain arbitrary data which will be converted to binaries. The +%% values returned by each command in the pipeline are returned in a list. +qp(Client, Pipeline) -> + pipeline(Client, Pipeline, ?TIMEOUT). + +qp(Client, Pipeline, Timeout) -> + pipeline(Client, Pipeline, Timeout). + +-spec q_noreply(Client::client(), Command::[any()]) -> ok. +%% @doc Executes the command but does not wait for a response and ignores any errors. +%% @see q/2 +q_noreply(Client, Command) -> + cast(Client, Command). + +-spec q_async(Client::client(), Command::[any()]) -> ok. +% @doc Executes the command, and sends a message to this process with the response (with either error or success). Message is of the form `{response, Reply}', where `Reply' is the reply expected from `q/2'. +q_async(Client, Command) -> + q_async(Client, Command, self()). + +-spec q_async(Client::client(), Command::[any()], Pid::pid()|atom()) -> ok. +%% @doc Executes the command, and sends a message to `Pid' with the response (with either or success). +%% @see 1_async/2 +q_async(Client, Command, Pid) when is_pid(Pid) -> + Request = {request, create_multibulk(Command), Pid}, + gen_server:cast(Client, Request). + +%% +%% INTERNAL HELPERS +%% + +call(Client, Command, Timeout) -> + Request = {request, create_multibulk(Command)}, + gen_server:call(Client, Request, Timeout). + +pipeline(_Client, [], _Timeout) -> + []; +pipeline(Client, Pipeline, Timeout) -> + Request = {pipeline, [create_multibulk(Command) || Command <- Pipeline]}, + gen_server:call(Client, Request, Timeout). + +cast(Client, Command) -> + Request = {request, create_multibulk(Command)}, + gen_server:cast(Client, Request). + +-spec create_multibulk(Args::[any()]) -> Command::iolist(). +%% @doc: Creates a multibulk command with all the correct size headers +create_multibulk(Args) -> + ArgCount = [<<$*>>, integer_to_list(length(Args)), <>], + ArgsBin = lists:map(fun to_bulk/1, lists:map(fun to_binary/1, Args)), + + [ArgCount, ArgsBin]. + +to_bulk(B) when is_binary(B) -> + [<<$$>>, integer_to_list(iolist_size(B)), <>, B, <>]. + +%% @doc: Convert given value to binary. Fallbacks to +%% term_to_binary/1. For floats, throws {cannot_store_floats, Float} +%% as we do not want floats to be stored in Redis. Your future self +%% will thank you for this. +to_binary(X) when is_list(X) -> list_to_binary(X); +to_binary(X) when is_atom(X) -> atom_to_binary(X, utf8); +to_binary(X) when is_binary(X) -> X; +to_binary(X) when is_integer(X) -> integer_to_binary(X); +to_binary(X) when is_float(X) -> throw({cannot_store_floats, X}); +to_binary(X) -> term_to_binary(X).