10 #ifndef CTRL_UTILS_REDIS_CLIENT_H_
11 #define CTRL_UTILS_REDIS_CLIENT_H_
13 #include <cpp_redis/cpp_redis>
20 #include <unordered_set>
23 #include "ctrl_utils/string.h"
24 #include "ctrl_utils/type_traits.h"
30 template <
typename... Ts>
31 using is_all_strings =
typename std::enable_if_t<
32 std::conjunction_v<std::is_convertible<Ts, std::string>...>>;
36 typename std::enable_if_t<std::is_convertible_v<T, std::string>>;
39 struct is_pair :
public std::false_type {};
41 template <
typename T,
typename S>
42 struct is_pair<
std::pair<T, S>> :
public std::true_type {};
44 template <
typename... Ts>
46 typename std::enable_if_t<std::conjunction_v<is_pair<Ts>...>>;
51 void connect(
const std::string& host =
"127.0.0.1",
size_t port = 6379,
52 const std::string& password =
"") {
55 cpp_redis::client::connect(host, port);
57 if (!password.empty()) {
58 const std::future<cpp_redis::reply> fut =
59 cpp_redis::client::auth(password);
60 cpp_redis::client::commit();
86 std::future<T>
get(
const std::string& key);
110 template <
typename T>
111 std::future<void>
get(
const std::string& key, T& val);
140 template <
typename T>
142 const std::string& key,
const std::function<
void(T&&)>& reply_callback,
143 const std::function<
void(
const std::string&)>& error_callback = {});
162 template <
typename T>
186 template <
typename T>
187 std::future<cpp_redis::reply>
set(
const std::string& key,
const T& value);
214 template <
typename T>
216 const reply_callback_t& reply_callback);
235 template <
typename T>
236 cpp_redis::reply
sync_set(
const std::string& key,
const T& value);
255 template <
class... Ts,
class... Strings,
256 typename = is_all_strings<Strings...>>
257 std::future<std::tuple<Ts...>>
mget(
const Strings&... keys);
276 template <
typename T>
277 std::future<std::vector<T>>
mget(
const std::vector<std::string>& keys);
292 template <
class... Ts,
class... Strings,
293 typename = is_all_strings<Strings...>>
310 template <
typename T>
311 std::vector<T>
sync_mget(
const std::vector<std::string>& keys);
337 template <
class... Pairs,
typename = is_all_pairs<Pairs...>>
338 std::future<cpp_redis::reply>
mset(
const Pairs&... key_vals);
365 template <
typename T>
366 std::future<cpp_redis::reply>
mset(
367 const std::vector<std::pair<std::string, T>>& key_vals);
398 template <
typename T>
399 RedisClient&
mset(
const std::vector<std::pair<std::string, T>>& key_vals,
400 const reply_callback_t& reply_callback);
415 template <
class... Pairs,
typename = is_all_pairs<Pairs...>>
416 cpp_redis::reply
sync_mset(
const Pairs&... key_vals);
438 template <
typename T>
440 const std::vector<std::pair<std::string, T>>& key_vals);
442 template <
typename T>
443 RedisClient& hset(
const std::string& key,
const std::string& field,
444 const T& value,
const reply_callback_t& reply_callback);
446 template <
typename T>
447 std::future<cpp_redis::reply> hset(
const std::string& key,
448 const std::string& field,
const T& value);
450 template <
typename T>
451 cpp_redis::reply sync_hset(
const std::string& key,
const std::string& field,
454 template <
typename T>
455 RedisClient& publish(
const std::string& key,
const T& value,
456 const reply_callback_t& reply_callback);
458 template <
typename T>
459 std::future<cpp_redis::reply> publish(
const std::string& key,
const T& value);
461 template <
typename T>
462 cpp_redis::reply sync_publish(
const std::string& key,
const T& value);
464 template <
typename TSub,
typename TPub>
465 RedisClient& request(
const std::string& key_pub,
const TPub& value_pub,
466 const std::string& key_sub,
467 std::function<
void(TSub&&)>&& sub_callback);
469 template <
typename TSub,
typename TPub>
470 std::future<TSub> request(
const std::string& key_pub,
const TPub& value_pub,
471 const std::string& key_sub);
473 template <
typename TSub,
typename TPub>
474 TSub sync_request(
const std::string& key_pub,
const TPub& value_pub,
475 const std::string& key_sub);
478 const std::string& pattern,
479 std::function<
void(std::unordered_set<std::string>&&)>&& callback);
481 std::future<std::unordered_set<std::string>> scan(
const std::string& pattern);
483 std::unordered_set<std::string> sync_scan(
const std::string& pattern);
486 template <
typename T>
487 static bool ReplyToString(
const cpp_redis::reply& reply, T& value);
489 template <
typename Tuple,
size_t... Is>
490 static void RepliesToTuple(
const std::vector<cpp_redis::reply>& replies,
491 Tuple& values, std::index_sequence<Is...>);
493 template <
typename Key,
typename Val,
typename = is_
string<Key>>
494 static bool KeyvalToString(
const std::pair<Key, Val>& key_val,
495 std::pair<std::string, std::string>& key_valstr);
497 template <
typename Tuple,
size_t... Is>
498 static void KeyvalsToString(
500 std::vector<std::pair<std::string, std::string>>& key_valstr,
501 std::index_sequence<Is...>);
504 size_t cursor,
const std::string& pattern,
505 std::unordered_set<std::string>&& keys,
506 std::function<
void(std::unordered_set<std::string>&&)>&& callback);
516 template <
typename T>
518 const std::string& key,
const std::function<
void(T&&)>& reply_callback,
519 const std::function<
void(
const std::string&)>& error_callback) {
520 send({
"GET", key}, [key, reply_callback,
521 error_callback](cpp_redis::reply& reply) {
522 if (!reply.is_string()) {
523 if (error_callback) {
525 "RedisClient::get(): Failed to get string value from key: " + key +
531 reply_callback(FromString<T>(reply.as_string()));
532 }
catch (
const std::exception& e) {
533 if (error_callback) {
534 error_callback(
"RedisClient::get(): Exception thrown on key: " + key +
543 template <
typename T>
545 auto promise = std::make_shared<std::promise<T>>();
547 key, [promise](T&& value) { promise->set_value(std::move(value)); },
548 [promise](
const std::string& error) {
549 promise->set_exception(
550 std::make_exception_ptr(std::runtime_error(error)));
552 return promise->get_future();
555 template <
typename T>
557 auto promise = std::make_shared<std::promise<void>>();
560 [promise, key, &value](std::string&& str_value) {
563 promise->set_value();
564 }
catch (
const std::exception& e) {
565 const std::string error =
566 "RedisClient::get(): Exception thrown on key: " + key +
"\n\t" +
568 promise->set_exception(
569 std::make_exception_ptr(std::runtime_error(error)));
572 [promise](
const std::string& error) {
573 promise->set_exception(
574 std::make_exception_ptr(std::runtime_error(error)));
576 return promise->get_future();
579 template <
typename T>
581 std::future<T> future = get<T>(key);
586 template <
typename T>
588 const reply_callback_t& reply_callback) {
589 send({
"SET", key, ToString(value)}, reply_callback);
593 template <
typename T>
596 auto promise = std::make_shared<std::promise<cpp_redis::reply>>();
597 set(key, value, [promise](cpp_redis::reply& reply) {
599 promise->set_value(reply);
601 promise->set_exception(
602 std::make_exception_ptr(std::runtime_error(reply.error())));
605 return promise->get_future();
608 template <
typename T>
610 std::future<cpp_redis::reply> future =
set(key, value);
615 template <
typename T>
616 bool RedisClient::ReplyToString(
const cpp_redis::reply& reply, T& value) {
621 template <
typename Tuple,
size_t... Is>
622 void RedisClient::RepliesToTuple(
const std::vector<cpp_redis::reply>& replies,
623 Tuple& values, std::index_sequence<Is...>) {
624 std::initializer_list<bool>{
625 ReplyToString(replies[Is], std::get<Is>(values))...};
628 template <
typename Key,
typename Val,
typename>
629 bool RedisClient::KeyvalToString(
630 const std::pair<Key, Val>& key_val,
631 std::pair<std::string, std::string>& key_valstr) {
632 key_valstr.first = key_val.first;
633 ToString(key_valstr.second, key_val.second);
637 template <
typename Tuple,
size_t... Is>
638 void RedisClient::KeyvalsToString(
640 std::vector<std::pair<std::string, std::string>>& key_valstr,
641 std::index_sequence<Is...>) {
642 std::initializer_list<bool>{
643 KeyvalToString(std::get<Is>(args), key_valstr[Is])...};
646 template <
class... Pairs,
typename>
648 constexpr
size_t num_pairs =
sizeof...(Pairs);
649 std::vector<std::pair<std::string, std::string>> key_valstr(num_pairs);
650 KeyvalsToString(std::make_tuple(key_vals...), key_valstr,
651 std::index_sequence_for<Pairs...>{});
652 return cpp_redis::client::mset(key_valstr);
655 template <
class... Pairs,
typename>
657 std::future<cpp_redis::reply> future =
mset(key_vals...);
662 template <
class... Ts,
class... Strings,
typename>
664 static_assert(
sizeof...(Ts) ==
sizeof...(Strings),
665 "Number of keys must equal number of output types.");
667 auto promise = std::make_shared<std::promise<std::tuple<Ts...>>>();
674 std::vector<std::string> command = {
"MGET", keys...};
675 send(command, [
this, command, promise](cpp_redis::reply& reply) {
676 if (!reply.is_array()) {
677 std::stringstream ss_error(
678 "RedisClient::mget(): Failed to get values from keys:");
679 for (size_t i = 1; i < command.size(); i++) ss_error <<
" " << command[i];
681 promise->set_exception(
682 std::make_exception_ptr(std::runtime_error(ss_error.str())));
685 std::tuple<Ts...> values;
687 RepliesToTuple(reply.as_array(), values,
688 std::index_sequence_for<Ts...>{});
689 }
catch (
const std::exception& e) {
690 std::stringstream ss_error(
691 "RedisClient::mget(): Failed to get values from keys:");
692 for (
size_t i = 1; i < command.size(); i++) ss_error <<
" " << command[i];
694 promise->set_exception(
695 std::make_exception_ptr(std::runtime_error(ss_error.str())));
698 promise->set_value(std::move(values));
700 return promise->get_future();
703 template <
typename T>
704 std::future<std::vector<T>> RedisClient::mget(
705 const std::vector<std::string>& keys) {
706 auto promise = std::make_shared<std::promise<std::vector<T>>>();
707 std::vector<std::string> command(keys.size() + 1);
709 std::copy(keys.begin(), keys.end(), command.begin() + 1);
710 send(command, [
this, command, promise](cpp_redis::reply& reply) {
711 if (!reply.is_array()) {
712 std::stringstream ss_error(
713 "RedisClient::mget(): Failed to get values from keys:");
714 for (size_t i = 1; i < command.size(); i++) ss_error <<
" " << command[i];
716 promise->set_exception(
717 std::make_exception_ptr(std::runtime_error(ss_error.str())));
720 std::vector<T> values;
721 values.reserve(reply.as_array().size());
723 for (const cpp_redis::reply& r : reply.as_array()) {
724 values.push_back(FromString<T>(r.as_string()));
726 }
catch (
const std::exception& e) {
727 std::stringstream ss_error(
728 "RedisClient::mget(): Failed to get values from keys:");
729 for (
size_t i = 1; i < command.size(); i++) ss_error <<
" " << command[i];
731 promise->set_exception(
732 std::make_exception_ptr(std::runtime_error(ss_error.str())));
735 promise->set_value(std::move(values));
737 return promise->get_future();
740 template <
class... Ts,
class... Args,
typename>
741 std::tuple<Ts...> RedisClient::sync_mget(
const Args&... args) {
742 std::future<std::tuple<Ts...>> future = mget<Ts...>(args...);
747 template <
typename T>
748 std::vector<T> RedisClient::sync_mget(
const std::vector<std::string>& keys) {
749 std::future<std::vector<T>> future = mget<T>(keys);
790 template <
typename T>
792 const std::vector<std::pair<std::string, T>>& key_vals,
793 const reply_callback_t& reply_callback) {
794 std::vector<std::string> command;
795 command.reserve(2 * key_vals.size() + 1);
796 command.push_back(
"MSET");
797 for (
const std::pair<std::string, T>& key_val : key_vals) {
798 command.push_back(key_val.first);
799 command.push_back(ToString(key_val.second));
801 send(command, reply_callback);
805 template <
typename T>
806 std::future<cpp_redis::reply> RedisClient::mset(
807 const std::vector<std::pair<std::string, T>>& key_vals) {
808 auto promise = std::make_shared<std::promise<cpp_redis::reply>>();
809 mset(key_vals, [promise](cpp_redis::reply& reply) {
811 promise->set_value(reply);
813 promise->set_exception(
814 std::make_exception_ptr(std::runtime_error(reply.error())));
817 return promise->get_future();
820 template <
typename T>
821 cpp_redis::reply RedisClient::sync_mset(
822 const std::vector<std::pair<std::string, T>>& key_vals) {
823 std::future<cpp_redis::reply> future = mset(key_vals);
828 template <
typename T>
829 RedisClient& RedisClient::publish(
const std::string& key,
const T& value,
830 const reply_callback_t& reply_callback) {
832 ToString(str, value);
833 cpp_redis::client::publish(key, str, reply_callback);
837 template <
typename T>
838 RedisClient& RedisClient::hset(
const std::string& key,
const std::string& field,
840 const reply_callback_t& reply_callback) {
842 ToString(str, value);
843 cpp_redis::client::hset(key, field, str, reply_callback);
847 template <
typename T>
848 std::future<cpp_redis::reply> RedisClient::hset(
const std::string& key,
849 const std::string& field,
852 ToString(str, value);
853 return cpp_redis::client::hset(key, field, str);
856 template <
typename T>
857 cpp_redis::reply RedisClient::sync_hset(
const std::string& key,
858 const std::string& field,
860 std::future<cpp_redis::reply> future = hset(key, field, value);
865 template <
typename T>
866 std::future<cpp_redis::reply> RedisClient::publish(
const std::string& key,
869 ToString(str, value);
870 return cpp_redis::client::publish(key, str);
873 template <
typename T>
874 cpp_redis::reply RedisClient::sync_publish(
const std::string& key,
876 std::future<cpp_redis::reply> future = publish(key, value);
881 template <
typename TSub,
typename TPub>
882 RedisClient& RedisClient::request(
const std::string& key_pub,
883 const TPub& value_pub,
884 const std::string& key_sub,
885 std::function<
void(TSub&&)>&& sub_callback) {
886 auto sub = std::make_shared<cpp_redis::subscriber>();
887 sub->connect(host_, port_);
889 sub->subscribe(key_sub, [sub, sub_callback = std::move(sub_callback)](
890 const std::string& key,
891 const std::string& str_value)
mutable {
892 sub_callback(FromString<TSub>(str_value));
893 std::thread([sub = std::move(sub), key]()
mutable {
894 sub->unsubscribe(key);
900 publish(key_pub, value_pub);
904 template <
typename TSub,
typename TPub>
905 std::future<TSub> RedisClient::request(
const std::string& key_pub,
906 const TPub& value_pub,
907 const std::string& key_sub) {
908 auto promise = std::make_shared<std::promise<TSub>>();
909 request<TSub>(key_pub, value_pub, key_sub, [promise](TSub&& value)
mutable {
910 if (!promise)
return;
911 promise->set_value(std::move(value));
914 return promise->get_future();
917 template <
typename TSub,
typename TPub>
918 TSub RedisClient::sync_request(
const std::string& key_pub,
919 const TPub& value_pub,
920 const std::string& key_sub) {
921 std::future<TSub> fut_value = request<TSub>(key_pub, value_pub, key_sub);
923 return fut_value.get();
926 inline RedisClient& RedisClient::scan(
927 size_t cursor,
const std::string& pattern,
928 std::unordered_set<std::string>&& keys,
929 std::function<
void(std::unordered_set<std::string>&&)>&& callback) {
930 cpp_redis::client::scan(
932 [
this, pattern, keys = std::move(keys),
933 callback = std::move(callback)](cpp_redis::reply& reply)
mutable {
935 if (!reply.is_array()) {
936 throw std::runtime_error(
"RedisClient::scan(): Invalid reply.");
938 const std::vector<cpp_redis::reply> replies = reply.as_array();
941 if (replies.size() != 2 || !replies[0].is_string() ||
942 !replies[1].is_array()) {
943 throw std::runtime_error(
"RedisClient::scan(): Invalid reply.");
947 if (replies.size() < 1 || !replies[0].is_string()) {
948 throw std::runtime_error(
"RedisClient::scan(): Invalid reply.");
950 size_t cursor_next = FromString<size_t>(replies[0].as_string());
951 if (cursor_next == 0) {
952 callback(std::move(keys));
957 if (replies.size() < 2 || !replies[1].is_array()) {
958 throw std::runtime_error(
"RedisClient::scan(): Invalid reply.");
960 const std::vector<cpp_redis::reply> keys_batch = replies[1].as_array();
963 for (
const cpp_redis::reply& key : keys_batch) {
964 if (!key.is_string()) {
965 throw std::runtime_error(
"RedisClient::scan(): Invalid reply.");
967 keys.insert(key.as_string());
970 scan(cursor_next, pattern, std::move(keys), std::move(callback));
976 inline RedisClient& RedisClient::scan(
977 const std::string& pattern,
978 std::function<
void(std::unordered_set<std::string>&&)>&& callback) {
979 return scan(0, pattern, {}, std::move(callback));
982 inline std::future<std::unordered_set<std::string>> RedisClient::scan(
983 const std::string& pattern) {
985 std::make_shared<std::promise<std::unordered_set<std::string>>>();
986 scan(pattern, [promise](std::unordered_set<std::string>&& keys)
mutable {
987 if (!promise)
return;
988 promise->set_value(std::move(keys));
991 return promise->get_future();
994 inline std::unordered_set<std::string> RedisClient::sync_scan(
995 const std::string& pattern) {
996 std::future<std::unordered_set<std::string>> fut_keys = scan(pattern);
998 return fut_keys.get();
Definition: redis_client.h:28
std::future< T > get(const std::string &key)
Definition: redis_client.h:544
cpp_redis::reply sync_set(const std::string &key, const T &value)
Definition: redis_client.h:609
std::future< cpp_redis::reply > set(const std::string &key, const T &value)
Definition: redis_client.h:594
cpp_redis::reply sync_mset(const Pairs &... key_vals)
Definition: redis_client.h:656
T sync_get(const std::string &key)
Definition: redis_client.h:580
std::future< std::tuple< Ts... > > mget(const Strings &... keys)
Definition: redis_client.h:663
std::future< cpp_redis::reply > mset(const Pairs &... key_vals)
Definition: redis_client.h:647
std::tuple< Ts... > sync_mget(const Strings &... keys)
Definition: ctrl_utils.cc:18
nlohmann::json FromString(const std::string &str)
Definition: json.h:27