Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Supports node black list for load balance #1985

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/meta/app_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <algorithm>
#include <functional>
#include <iterator>
#include <map>
#include <set>

Expand Down Expand Up @@ -66,6 +65,12 @@ app_balance_policy::app_balance_policy(meta_service *svc) : load_balance_policy(
void app_balance_policy::balance(bool checker, const meta_view *global_view, migration_list *list)
{
init(global_view, list);

if (_alive_nodes - _balancer_ignored_nodes.size() < 2) {
LOG_WARNING("the nodes that could balance is less 2");
return;
}

const app_mapper &apps = *_global_view->apps;
if (!execute_balance(apps,
checker,
Expand Down Expand Up @@ -116,7 +121,7 @@ bool app_balance_policy::copy_secondary(const std::shared_ptr<app_state> &app, b
int replicas_low = app->partition_count / _alive_nodes;

std::unique_ptr<copy_replica_operation> operation = std::make_unique<copy_secondary_operation>(
app, apps, nodes, host_port_vec, host_port_id, replicas_low);
app, apps, nodes, host_port_vec, host_port_id, _balancer_ignored_nodes, replicas_low);
return operation->start(_migration_result);
}

Expand All @@ -126,16 +131,17 @@ copy_secondary_operation::copy_secondary_operation(
node_mapper &nodes,
const std::vector<dsn::host_port> &host_port_vec,
const std::unordered_map<dsn::host_port, int> &host_port_id,
const std::set<dsn::host_port> balancer_ignored_nodes,
int replicas_low)
: copy_replica_operation(app, apps, nodes, host_port_vec, host_port_id),
: copy_replica_operation(app, apps, nodes, host_port_vec, host_port_id, balancer_ignored_nodes),
_replicas_low(replicas_low)
{
}

bool copy_secondary_operation::can_continue()
{
int id_min = *_ordered_host_port_ids.begin();
int id_max = *_ordered_host_port_ids.rbegin();
int id_min = find_min_load_nodes_without_blacklist();
int id_max = find_max_load_nodes_without_blacklist();
if (_partition_counts[id_max] <= _replicas_low ||
_partition_counts[id_max] - _partition_counts[id_min] <= 1) {
LOG_INFO("{}: stop copy secondary coz it will be balanced later", _app->get_logname());
Expand All @@ -151,7 +157,7 @@ int copy_secondary_operation::get_partition_count(const node_state &ns) const

bool copy_secondary_operation::can_select(gpid pid, migration_list *result)
{
int id_max = *_ordered_host_port_ids.rbegin();
int id_max = find_max_load_nodes_without_blacklist();
const node_state &max_ns = _nodes.at(_host_port_vec[id_max]);
if (max_ns.served_as(pid) == partition_status::PS_PRIMARY) {
LOG_DEBUG("{}: skip gpid({}.{}) coz it is primary",
Expand All @@ -170,7 +176,7 @@ bool copy_secondary_operation::can_select(gpid pid, migration_list *result)
return false;
}

int id_min = *_ordered_host_port_ids.begin();
int id_min = find_min_load_nodes_without_blacklist();
const node_state &min_ns = _nodes.at(_host_port_vec[id_min]);
if (min_ns.served_as(pid) != partition_status::PS_INACTIVE) {
LOG_DEBUG("{}: skip gpid({}.{}) coz it is already a member on the target node",
Expand Down
9 changes: 8 additions & 1 deletion src/meta/app_balance_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

#include <gtest/gtest_prod.h>
#include <memory>
#include <set>
#include <unordered_map>
#include <vector>

#include "load_balance_policy.h"
#include "meta/meta_data.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/command_manager.h"

namespace dsn {
class gpid;
class host_port;

namespace replication {
class meta_service;
Expand All @@ -51,6 +52,11 @@ class app_balance_policy : public load_balance_policy
bool _balancer_in_turn;
bool _only_primary_balancer;
bool _only_move_primary;

std::set<dsn::host_port> ignored_nodes;

friend class meta_service_test_app;
friend class meta_service_test;
};

class copy_secondary_operation : public copy_replica_operation
Expand All @@ -61,6 +67,7 @@ class copy_secondary_operation : public copy_replica_operation
node_mapper &nodes,
const std::vector<dsn::host_port> &address_vec,
const std::unordered_map<dsn::host_port, int> &address_id,
const std::set<dsn::host_port> balancer_ignored_nodes,
int replicas_low);
~copy_secondary_operation() = default;

Expand Down
68 changes: 56 additions & 12 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,20 @@ uint32_t get_partition_count(const node_state &ns, balance_type type, int32_t ap
return (uint32_t)count;
}

uint32_t get_skew(const std::map<host_port, uint32_t> &count_map)
uint32_t get_skew(const std::map<host_port, uint32_t> &count_map,
std::set<dsn::host_port> balancer_ignored_nodes)
{
uint32_t min = UINT_MAX, max = 0;

for (auto iter : balancer_ignored_nodes) {
LOG_INFO("get_skew the ignored node is {}", iter);
}

for (const auto &kv : count_map) {
if (balancer_ignored_nodes.count(kv.first) != 0) {
continue;
}
LOG_INFO("the node is {}", kv.first);
if (kv.second < min) {
min = kv.second;
}
Expand All @@ -83,17 +93,31 @@ uint32_t get_skew(const std::map<host_port, uint32_t> &count_map)

void get_min_max_set(const std::map<host_port, uint32_t> &node_count_map,
/*out*/ std::set<host_port> &min_set,
/*out*/ std::set<host_port> &max_set)
/*out*/ std::set<host_port> &max_set,
std::set<dsn::host_port> balancer_ignored_nodes)
{
std::multimap<uint32_t, host_port> count_multimap = utils::flip_map(node_count_map);

auto range = count_multimap.equal_range(count_multimap.begin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
auto iter = count_multimap.begin();
while (iter != count_multimap.end() && balancer_ignored_nodes.count(iter->second) != 0) {
++iter;
}
auto min_range = count_multimap.equal_range(iter->first);
for (auto iter = min_range.first; iter != min_range.second; ++iter) {
if (balancer_ignored_nodes.count(iter->second) != 0)
continue;
min_set.insert(iter->second);
}

range = count_multimap.equal_range(count_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
auto iter_max = count_multimap.rbegin();
while (iter_max != count_multimap.rend() &&
balancer_ignored_nodes.count(iter_max->second) != 0) {
--iter_max;
}
auto max_range = count_multimap.equal_range(iter_max->first);
for (auto iter = max_range.first; iter != max_range.second; ++iter) {
if (balancer_ignored_nodes.count(iter->second) != 0)
continue;
max_set.insert(iter->second);
}
}
Expand All @@ -106,6 +130,11 @@ void cluster_balance_policy::balance(bool checker,
{
init(global_view, list);

if (_alive_nodes - _ignored_nodes.size() < 2) {
LOG_WARNING("the nodes that could balance is less 2");
return;
}

if (!execute_balance(*_global_view->apps,
false, /* balance_checker */
true, /* balance_in_turn */
Expand Down Expand Up @@ -198,8 +227,8 @@ bool cluster_balance_policy::get_cluster_migration_info(
if (!get_app_migration_info(app, nodes, type, info)) {
return false;
}
cluster_info.apps_skew[kv.first] = get_skew(info.replicas_count, _balancer_ignored_nodes);
cluster_info.apps_info.emplace(kv.first, std::move(info));
cluster_info.apps_skew[kv.first] = get_skew(info.replicas_count);
}

for (const auto &kv : nodes) {
Expand Down Expand Up @@ -282,7 +311,7 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
return false;
}

auto server_skew = get_skew(cluster_info.replicas_count);
auto server_skew = get_skew(cluster_info.replicas_count, _balancer_ignored_nodes);
if (max_app_skew <= 1 && server_skew <= 1) {
LOG_INFO("every app is balanced and the cluster as a whole is balanced");
return false;
Expand All @@ -295,7 +324,10 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
**/
std::set<host_port> cluster_min_count_nodes;
std::set<host_port> cluster_max_count_nodes;
get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes);
get_min_max_set(cluster_info.replicas_count,
cluster_min_count_nodes,
cluster_max_count_nodes,
_balancer_ignored_nodes);

bool found = false;
auto app_range = app_skew_multimap.equal_range(max_app_skew);
Expand All @@ -308,7 +340,7 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
auto app_map = it->second.replicas_count;
std::set<host_port> app_min_count_nodes;
std::set<host_port> app_max_count_nodes;
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes);
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes, _balancer_ignored_nodes);

/**
* Compute the intersection of the replica servers most loaded for the app
Expand All @@ -327,7 +359,19 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
* cluster skew the same or make it worse while keeping the app balanced.
**/
std::multimap<uint32_t, host_port> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&

auto app_min_partition = app_count_multimap.begin();
while (app_min_partition != app_count_multimap.end() &&
is_ignored_node(app_min_partition->second)) {
++app_min_partition;
}
auto app_max_partition = app_count_multimap.rbegin();
while (app_max_partition != app_count_multimap.rend() &&
is_ignored_node(app_max_partition->second)) {
--app_max_partition;
}

if (app_max_partition->first <= app_min_partition->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
LOG_INFO("do not move replicas of a balanced app({}) if the least (most) loaded "
"servers overall do not intersect the servers hosting the least (most) "
Expand Down Expand Up @@ -554,7 +598,7 @@ bool cluster_balance_policy::apply_move(const move_info &move,
move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source, target));
selected_pids.insert(move.pid);

cluster_info.apps_skew[app_id] = get_skew(app_info.replicas_count);
cluster_info.apps_skew[app_id] = get_skew(app_info.replicas_count, _balancer_ignored_nodes);
cluster_info.apps_info[app_id] = app_info;
cluster_info.nodes_info[source] = node_source;
cluster_info.nodes_info[target] = node_target;
Expand Down
9 changes: 7 additions & 2 deletions src/meta/cluster_balance_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ namespace replication {
class meta_service;

uint32_t get_partition_count(const node_state &ns, balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<host_port, uint32_t> &count_map);
uint32_t get_skew(const std::map<host_port, uint32_t> &count_map,
std::set<dsn::host_port> balancer_ignored_nodes);
void get_min_max_set(const std::map<host_port, uint32_t> &node_count_map,
/*out*/ std::set<host_port> &min_set,
/*out*/ std::set<host_port> &max_set);
/*out*/ std::set<host_port> &max_set,
std::set<dsn::host_port> balancer_ignored_nodes);

class cluster_balance_policy : public load_balance_policy
{
Expand All @@ -58,6 +60,7 @@ class cluster_balance_policy : public load_balance_policy
struct cluster_migration_info;
struct move_info;
struct node_migration_info;
std::set<dsn::host_port> _ignored_nodes;

bool cluster_replica_balance(const meta_view *global_view,
const balance_type type,
Expand Down Expand Up @@ -174,6 +177,8 @@ class cluster_balance_policy : public load_balance_policy
balance_type type;
};

friend class meta_service_test_app;
friend class meta_service_test;
FRIEND_TEST(cluster_balance_policy, app_migration_info);
FRIEND_TEST(cluster_balance_policy, node_migration_info);
FRIEND_TEST(cluster_balance_policy, get_skew);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/distributed_lock_service_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,5 +300,5 @@ error_code distributed_lock_service_simple::query_cache(const std::string &lock_
}
return err;
}
}
}
} // namespace dist
} // namespace dsn
4 changes: 2 additions & 2 deletions src/meta/distributed_lock_service_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ class distributed_lock_service_simple : public distributed_lock_service

dsn::task_tracker _tracker;
};
}
}
} // namespace dist
} // namespace dsn
2 changes: 2 additions & 0 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class greedy_load_balancer : public server_load_balancer
std::unique_ptr<load_balance_policy> _cluster_balance_policy;

std::unique_ptr<command_deregister> _get_balance_operation_count;
friend class meta_service_test_app;
friend class meta_service_test;

private:
void greedy_balancer(bool balance_checker);
Expand Down
Loading
Loading