diff options
Diffstat (limited to 'rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_lb_worker.c')
-rw-r--r-- | rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_lb_worker.c | 1823 |
1 files changed, 1823 insertions, 0 deletions
diff --git a/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_lb_worker.c b/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_lb_worker.c new file mode 100644 index 00000000..3c0ce67a --- /dev/null +++ b/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_lb_worker.c @@ -0,0 +1,1823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*************************************************************************** + * Description: Load balancer worker, knows how to load balance among * + * several workers. * + * Author: Gal Shachor <shachor@il.ibm.com> * + * Author: Mladen Turk <mturk@apache.org> * + * Author: Rainer Jung <rjung@apache.org> * + * Based on: * + * Version: $Revision: 1137200 $ * + ***************************************************************************/ + +#include "jk_pool.h" +#include "jk_service.h" +#include "jk_util.h" +#include "jk_worker.h" +#include "jk_lb_worker.h" +#include "jk_ajp13.h" +#include "jk_ajp13_worker.h" +#include "jk_ajp14_worker.h" +#include "jk_mt.h" +#include "jk_shm.h" + +/* + * The load balancing code in this + */ + +/* + * The following two macros need to be kept in sync with + * the existing values for state and activation. + * Note: state <= JK_LB_STATE_FORCE is equivalent to + * state is none of JK_LB_STATE_BUSY, JK_LB_STATE_ERROR, JK_LB_STATE_PROBE + * Note: state <= JK_LB_STATE_BUSY is equivalent to + * state is none of JK_LB_STATE_ERROR, JK_LB_STATE_PROBE + * Note: activation == JK_LB_ACTIVATION_ACTIVE is equivalent to + * activation is none of JK_LB_ACTIVATION_STOPPED, JK_LB_ACTIVATION_DISABLED + */ +#define JK_WORKER_USABLE(s, activation) ((s) <= JK_LB_STATE_FORCE && activation == JK_LB_ACTIVATION_ACTIVE) +#define JK_WORKER_USABLE_STICKY(s, activation) ((s) <= JK_LB_STATE_BUSY && activation != JK_LB_ACTIVATION_STOPPED) + +static const char *lb_locking_type[] = { + JK_LB_LOCK_TEXT_OPTIMISTIC, + JK_LB_LOCK_TEXT_PESSIMISTIC, + "unknown", + NULL +}; + +static const char *lb_method_type[] = { + JK_LB_METHOD_TEXT_REQUESTS, + JK_LB_METHOD_TEXT_TRAFFIC, + JK_LB_METHOD_TEXT_BUSYNESS, + JK_LB_METHOD_TEXT_SESSIONS, + "unknown", + NULL +}; + +static const char *lb_state_type[] = { + JK_LB_STATE_TEXT_IDLE, + JK_LB_STATE_TEXT_OK, + JK_LB_STATE_TEXT_RECOVER, + JK_LB_STATE_TEXT_FORCE, + JK_LB_STATE_TEXT_BUSY, + JK_LB_STATE_TEXT_ERROR, + JK_LB_STATE_TEXT_PROBE, + "unknown", + NULL +}; + +static const char *lb_activation_type[] = { + JK_LB_ACTIVATION_TEXT_ACTIVE, + JK_LB_ACTIVATION_TEXT_DISABLED, + JK_LB_ACTIVATION_TEXT_STOPPED, + "unknown", + NULL +}; + +static const char *lb_first_log_names[] = { + JK_NOTE_LB_FIRST_NAME, + JK_NOTE_LB_FIRST_VALUE, + JK_NOTE_LB_FIRST_ACCESSED, + JK_NOTE_LB_FIRST_READ, + JK_NOTE_LB_FIRST_TRANSFERRED, + JK_NOTE_LB_FIRST_ERRORS, + JK_NOTE_LB_FIRST_BUSY, + JK_NOTE_LB_FIRST_ACTIVATION, + JK_NOTE_LB_FIRST_STATE, + NULL +}; + +static const char *lb_last_log_names[] = { + JK_NOTE_LB_LAST_NAME, + JK_NOTE_LB_LAST_VALUE, + JK_NOTE_LB_LAST_ACCESSED, + JK_NOTE_LB_LAST_READ, + JK_NOTE_LB_LAST_TRANSFERRED, + JK_NOTE_LB_LAST_ERRORS, + JK_NOTE_LB_LAST_BUSY, + JK_NOTE_LB_LAST_ACTIVATION, + JK_NOTE_LB_LAST_STATE, + NULL +}; + +struct lb_endpoint +{ + lb_worker_t *worker; + jk_endpoint_t endpoint; + int *states; +}; +typedef struct lb_endpoint lb_endpoint_t; + + +/* Calculate the greatest common divisor of two positive integers */ +static jk_uint64_t gcd(jk_uint64_t a, jk_uint64_t b) +{ + jk_uint64_t r; + if (b > a) { + r = a; + a = b; + b = r; + } + while (b > 0) { + r = a % b; + a = b; + b = r; + } + return a; +} + +/* Calculate the smallest common multiple of two positive integers */ +static jk_uint64_t scm(jk_uint64_t a, jk_uint64_t b) +{ + return a * b / gcd(a, b); +} + +/* Return the string representation of the lb lock type */ +const char *jk_lb_get_lock(lb_worker_t *p, jk_logger_t *l) +{ + return lb_locking_type[p->lblock]; +} + +/* Return the int representation of the lb lock type */ +int jk_lb_get_lock_code(const char *v) +{ + if (!v) + return JK_LB_LOCK_DEF; + else if (*v == 'o' || *v == 'O' || *v == '0') + return JK_LB_LOCK_OPTIMISTIC; + else if (*v == 'p' || *v == 'P' || *v == '1') + return JK_LB_LOCK_PESSIMISTIC; + else + return JK_LB_LOCK_DEF; +} + +/* Return the string representation of the lb method type */ +const char *jk_lb_get_method(lb_worker_t *p, jk_logger_t *l) +{ + return lb_method_type[p->lbmethod]; +} + +/* Return the int representation of the lb method type */ +int jk_lb_get_method_code(const char *v) +{ + if (!v) + return JK_LB_METHOD_DEF; + else if (*v == 'r' || *v == 'R' || *v == '0') + return JK_LB_METHOD_REQUESTS; + else if (*v == 't' || *v == 'T' || *v == '1') + return JK_LB_METHOD_TRAFFIC; + else if (*v == 'b' || *v == 'B' || *v == '2') + return JK_LB_METHOD_BUSYNESS; + else if (*v == 's' || *v == 'S' || *v == '3') + return JK_LB_METHOD_SESSIONS; + else + return JK_LB_METHOD_DEF; +} + +/* Return the string representation of the balance worker state */ +const char *jk_lb_get_state(lb_sub_worker_t *p, jk_logger_t *l) +{ + return lb_state_type[p->s->state]; +} + +/* Return the int representation of the lb state */ +int jk_lb_get_state_code(const char *v) +{ + if (!v) + return JK_LB_STATE_DEF; + else if (*v == 'i' || *v == 'I' || *v == 'n' || *v == 'N' || *v == '0') + return JK_LB_STATE_IDLE; + else if (*v == 'o' || *v == 'O' || *v == '1') + return JK_LB_STATE_OK; + else if (*v == 'r' || *v == 'R' || *v == '2') + return JK_LB_STATE_RECOVER; + else if (*v == 'f' || *v == 'F' || *v == '3') + return JK_LB_STATE_FORCE; + else if (*v == 'b' || *v == 'B' || *v == '4') + return JK_LB_STATE_BUSY; + else if (*v == 'e' || *v == 'E' || *v == '5') + return JK_LB_STATE_ERROR; + else if (*v == 'p' || *v == 'P' || *v == '6') + return JK_LB_STATE_PROBE; + else + return JK_LB_STATE_DEF; +} + +/* Return the string representation of the balance worker activation */ +/* based on the integer representation */ +const char *jk_lb_get_activation_direct(int activation, jk_logger_t *l) +{ + return lb_activation_type[activation]; +} + +/* Return the string representation of the balance worker activation */ +/* based on the sub worker struct */ +const char *jk_lb_get_activation(lb_sub_worker_t *p, jk_logger_t *l) +{ + return lb_activation_type[p->activation]; +} + +int jk_lb_get_activation_code(const char *v) +{ + if (!v) + return JK_LB_ACTIVATION_DEF; + else if (*v == 'a' || *v == 'A' || *v == '0') + return JK_LB_ACTIVATION_ACTIVE; + else if (*v == 'd' || *v == 'D' || *v == '1') + return JK_LB_ACTIVATION_DISABLED; + else if (*v == 's' || *v == 'S' || *v == '2') + return JK_LB_ACTIVATION_STOPPED; + else + return JK_LB_ACTIVATION_DEF; +} + +/* Update the load multipliers wrt. lb_factor */ +void update_mult(lb_worker_t *p, jk_logger_t *l) +{ + unsigned int i = 0; + jk_uint64_t s = 1; + JK_TRACE_ENTER(l); + for (i = 0; i < p->num_of_workers; i++) { + s = scm(s, p->lb_workers[i].lb_factor); + } + for (i = 0; i < p->num_of_workers; i++) { + p->lb_workers[i].lb_mult = s / p->lb_workers[i].lb_factor; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s gets multiplicity %" + JK_UINT64_T_FMT, + p->lb_workers[i].name, + p->lb_workers[i].lb_mult); + } + JK_TRACE_EXIT(l); +} + +/* Reset all lb values. + */ +void reset_lb_values(lb_worker_t *p, jk_logger_t *l) +{ + unsigned int i = 0; + JK_TRACE_ENTER(l); + if (p->lbmethod != JK_LB_METHOD_BUSYNESS) { + for (i = 0; i < p->num_of_workers; i++) { + p->lb_workers[i].s->lb_value = 0; + } + } + JK_TRACE_EXIT(l); +} + +/* Syncing config values from shm */ +void jk_lb_pull(lb_worker_t *p, int locked, jk_logger_t *l) +{ + unsigned int i = 0; + + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing mem for lb '%s' from shm (%u->%u)", + p->name, p->sequence, p->s->h.sequence); + if (locked == JK_FALSE) + jk_shm_lock(); + p->sticky_session = p->s->sticky_session; + p->sticky_session_force = p->s->sticky_session_force; + p->recover_wait_time = p->s->recover_wait_time; + p->error_escalation_time = p->s->error_escalation_time; + p->max_reply_timeouts = p->s->max_reply_timeouts; + p->retries = p->s->retries; + p->retry_interval = p->s->retry_interval; + p->lbmethod = p->s->lbmethod; + p->lblock = p->s->lblock; + p->max_packet_size = p->s->max_packet_size; + p->sequence = p->s->h.sequence; + strncpy(p->session_cookie, p->s->session_cookie, JK_SHM_STR_SIZ); + strncpy(p->session_path, p->s->session_path, JK_SHM_STR_SIZ); + + for (i = 0; i < p->num_of_workers; i++) { + lb_sub_worker_t *w = &p->lb_workers[i]; + if (w->sequence != w->s->h.sequence) { + jk_worker_t *jw = w->worker; + ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing mem for member '%s' of lb '%s' from shm", + w->name, p->name); + + jk_ajp_pull(aw, JK_TRUE, l); + strncpy(w->route, w->s->route, JK_SHM_STR_SIZ); + strncpy(w->domain, w->s->domain, JK_SHM_STR_SIZ); + strncpy(w->redirect, w->s->redirect, JK_SHM_STR_SIZ); + w->distance = w->s->distance; + w->activation = w->s->activation; + w->lb_factor = w->s->lb_factor; + w->lb_mult = w->s->lb_mult; + w->sequence = w->s->h.sequence; + } + } + if (locked == JK_FALSE) + jk_shm_unlock(); + + JK_TRACE_EXIT(l); +} + +/* Syncing config values to shm */ +void jk_lb_push(lb_worker_t *p, int locked, jk_logger_t *l) +{ + unsigned int i = 0; + + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing shm for lb '%s' from mem (%u->%u)", + p->name, p->s->h.sequence, p->sequence); + if (locked == JK_FALSE) + jk_shm_lock(); + p->s->sticky_session = p->sticky_session; + p->s->sticky_session_force = p->sticky_session_force; + p->s->recover_wait_time = p->recover_wait_time; + p->s->error_escalation_time = p->error_escalation_time; + p->s->max_reply_timeouts = p->max_reply_timeouts; + p->s->retries = p->retries; + p->s->retry_interval = p->retry_interval; + p->s->lbmethod = p->lbmethod; + p->s->lblock = p->lblock; + p->s->max_packet_size = p->max_packet_size; + p->s->h.sequence = p->sequence; + strncpy(p->s->session_cookie, p->session_cookie, JK_SHM_STR_SIZ); + strncpy(p->s->session_path, p->session_path, JK_SHM_STR_SIZ); + + for (i = 0; i < p->num_of_workers; i++) { + lb_sub_worker_t *w = &p->lb_workers[i]; + if (w->sequence != w->s->h.sequence) { + jk_worker_t *jw = w->worker; + ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing shm for member '%s' of lb '%s' from mem", + w->name, p->name); + + jk_ajp_push(aw, JK_TRUE, l); + strncpy(w->s->route, w->route, JK_SHM_STR_SIZ); + strncpy(w->s->domain, w->domain, JK_SHM_STR_SIZ); + strncpy(w->s->redirect, w->redirect, JK_SHM_STR_SIZ); + w->s->distance = w->distance; + w->s->activation = w->activation; + w->s->lb_factor = w->lb_factor; + w->s->lb_mult = w->lb_mult; + w->s->h.sequence = w->sequence; + } + } + if (locked == JK_FALSE) + jk_shm_unlock(); + + JK_TRACE_EXIT(l); +} + +/* Retrieve the parameter with the given name */ +static char *get_path_param(jk_ws_service_t *s, const char *name) +{ + char *id_start = NULL; + for (id_start = strstr(s->req_uri, name); + id_start; id_start = strstr(id_start + 1, name)) { + if (id_start[strlen(name)] == '=') { + /* + * Session path-cookie was found, get it's value + */ + id_start += (1 + strlen(name)); + if (strlen(id_start)) { + char *id_end; + id_start = jk_pool_strdup(s->pool, id_start); + /* + * The query string is not part of req_uri, however + * to be on the safe side lets remove the trailing query + * string if appended... + */ + if ((id_end = strchr(id_start, '?')) != NULL) { + *id_end = '\0'; + } + /* + * Remove any trailing path element. + */ + if ((id_end = strchr(id_start, ';')) != NULL) { + *id_end = '\0'; + } + return id_start; + } + } + } + + return NULL; +} + +/* Retrieve the cookie with the given name */ +static char *get_cookie(jk_ws_service_t *s, const char *name) +{ + unsigned i; + char *result = NULL; + + for (i = 0; i < s->num_headers; i++) { + if (strcasecmp(s->headers_names[i], "cookie") == 0) { + + char *id_start; + for (id_start = strstr(s->headers_values[i], name); + id_start; id_start = strstr(id_start + 1, name)) { + if (id_start == s->headers_values[i] || + id_start[-1] == ';' || + id_start[-1] == ',' || isspace((int)id_start[-1])) { + id_start += strlen(name); + while (*id_start && isspace((int)(*id_start))) + ++id_start; + if (*id_start == '=' && id_start[1]) { + /* + * Session cookie was found, get it's value + */ + char *id_end; + size_t sz; + ++id_start; + if ((id_end = strpbrk(id_start, ";,")) != NULL) + sz = id_end - id_start; + else { + sz = strlen(id_start); + id_end = id_start + sz; + } + if (result == NULL) { + result = jk_pool_alloc(s->pool, sz + 1); + memcpy(result, id_start, sz); + result[sz] = '\0'; + } + else { + size_t osz = strlen(result) + 1; + result = + jk_pool_realloc(s->pool, osz + sz + 1, result, osz); + strcat(result, ";"); + strncat(result, id_start, sz); + } + id_start = id_end; + } + } + } + } + } + + return result; +} + + +/* Retrieve session id from the cookie or the parameter + * (parameter first) + */ +static char *get_sessionid(jk_ws_service_t *s, lb_worker_t *p, jk_logger_t *l) +{ + char *val; + val = get_path_param(s, p->session_path); + if (!val) { + val = get_cookie(s, p->session_cookie); + } + if (val && !*val) { + /* TODO: For now only log the empty sessions. + * However we should probably return 400 + * (BAD_REQUEST) in this case + */ + jk_log(l, JK_LOG_INFO, + "Detected empty session identifier."); + return NULL; + } + return val; +} + +static void close_workers(lb_worker_t *p, int num_of_workers, jk_logger_t *l) +{ + int i = 0; + for (i = 0; i < num_of_workers; i++) { + p->lb_workers[i].worker->destroy(&(p->lb_workers[i].worker), l); + } +} + +/* If the worker is in error state run + * retry on that worker. It will be marked as + * operational if the retry timeout is elapsed. + * The worker might still be unusable, but we try + * anyway. + * If the worker is in ok state and got no requests + * since the last global maintenance, we mark its + * state as not available. + * Return the number of workers not in error state. + */ +static int recover_workers(lb_worker_t *p, + jk_uint64_t curmax, + time_t now, + jk_logger_t *l) +{ + unsigned int i; + int non_error = 0; + int elapsed; + lb_sub_worker_t *w = NULL; + ajp_worker_t *aw = NULL; + JK_TRACE_ENTER(l); + + if (p->sequence != p->s->h.sequence) + jk_lb_pull(p, JK_TRUE, l); + for (i = 0; i < p->num_of_workers; i++) { + w = &p->lb_workers[i]; + aw = (ajp_worker_t *)w->worker->worker_private; + if (w->s->state == JK_LB_STATE_ERROR) { + elapsed = (int)difftime(now, w->s->error_time); + if (elapsed <= p->recover_wait_time) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s will recover in %d seconds", + w->name, p->recover_wait_time - elapsed); + } + else { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s is marked for recovery", + w->name); + if (p->lbmethod != JK_LB_METHOD_BUSYNESS) + w->s->lb_value = curmax; + aw->s->reply_timeouts = 0; + w->s->state = JK_LB_STATE_RECOVER; + non_error++; + } + } + else if (w->s->error_time > 0 && + (int)difftime(now, w->s->error_time) >= p->error_escalation_time) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s escalating local error to global error", + w->name); + w->s->state = JK_LB_STATE_ERROR; + } + else { + non_error++; + if (w->s->state == JK_LB_STATE_OK && + aw->s->used == w->s->elected_snapshot) + w->s->state = JK_LB_STATE_IDLE; + } + w->s->elected_snapshot = aw->s->used; + } + + JK_TRACE_EXIT(l); + return non_error; +} + +static int force_recovery(lb_worker_t *p, + int *states, + jk_logger_t *l) +{ + unsigned int i; + int forced = 0; + lb_sub_worker_t *w = NULL; + ajp_worker_t *aw = NULL; + JK_TRACE_ENTER(l); + + for (i = 0; i < p->num_of_workers; i++) { + w = &p->lb_workers[i]; + if (w->s->state == JK_LB_STATE_ERROR) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_INFO, + "worker %s is marked for forced recovery", + w->name); + aw = (ajp_worker_t *)w->worker->worker_private; + aw->s->reply_timeouts = 0; + w->s->state = JK_LB_STATE_FORCE; + if (states != NULL) + states[i] = JK_LB_STATE_FORCE; + forced++; + } + } + + JK_TRACE_EXIT(l); + return forced; +} + +/* Divide old load values by the decay factor, + * such that older values get less important + * for the routing decisions. + */ +static jk_uint64_t decay_load(lb_worker_t *p, + time_t exponent, + jk_logger_t *l) +{ + unsigned int i; + jk_uint64_t curmax = 0; + lb_sub_worker_t *w; + ajp_worker_t *aw; + + JK_TRACE_ENTER(l); + for (i = 0; i < p->num_of_workers; i++) { + w = &p->lb_workers[i]; + if (p->lbmethod != JK_LB_METHOD_BUSYNESS) { + w->s->lb_value >>= exponent; + if (w->s->lb_value > curmax) { + curmax = w->s->lb_value; + } + } + aw = (ajp_worker_t *)w->worker->worker_private; + aw->s->reply_timeouts >>= exponent; + } + JK_TRACE_EXIT(l); + return curmax; +} + +static int JK_METHOD maintain_workers(jk_worker_t *p, time_t now, jk_logger_t *l) +{ + unsigned int i = 0; + jk_uint64_t curmax = 0; + long delta; + + JK_TRACE_ENTER(l); + if (p && p->worker_private) { + lb_worker_t *lb = (lb_worker_t *)p->worker_private; + + for (i = 0; i < lb->num_of_workers; i++) { + if (lb->lb_workers[i].worker->maintain) { + lb->lb_workers[i].worker->maintain(lb->lb_workers[i].worker, now, l); + } + } + + jk_shm_lock(); + + /* Now we check for global maintenance (once for all processes). + * Checking workers for recovery and applying decay to the + * load values should not be done by each process individually. + * Therefore we globally sync and we use a global timestamp. + * Since it's possible that we come here a few milliseconds + * before the interval has passed, we allow a little tolerance. + */ + delta = (long)difftime(now, lb->s->last_maintain_time) + JK_LB_MAINTAIN_TOLERANCE; + if (delta >= lb->maintain_time) { + lb->s->last_maintain_time = now; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "decay with 2^%d", + JK_LB_DECAY_MULT * delta / lb->maintain_time); + curmax = decay_load(lb, JK_LB_DECAY_MULT * delta / lb->maintain_time, l); + if (!recover_workers(lb, curmax, now, l)) { + force_recovery(lb, NULL, l); + } + } + + jk_shm_unlock(); + + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +static int find_by_session(jk_ws_service_t *s, + lb_worker_t *p, + const char *session_route, + jk_logger_t *l) +{ + + int rc = -1; + unsigned int i; + + for (i = 0; i < p->num_of_workers; i++) { + if (strcmp(p->lb_workers[i].route, session_route) == 0) { + rc = i; + break; + } + } + return rc; +} + +static int find_best_bydomain(jk_ws_service_t *s, + lb_worker_t *p, + const char *route_or_domain, + int *states, + jk_logger_t *l) +{ + unsigned int i; + int d = 0; + jk_uint64_t curmin = 0; + int candidate = -1; + int activation; + lb_sub_worker_t wr; + char *idpart = strchr(route_or_domain, '.'); + size_t domain_len = 0; + + if (idpart) { + domain_len = idpart - route_or_domain; + } + else { + domain_len = strlen(route_or_domain); + } + /* First try to see if we have available candidate */ + for (i = 0; i < p->num_of_workers; i++) { + /* Skip all workers that are not member of domain */ + wr = p->lb_workers[i]; + if (strlen(wr.domain) == 0 || + strlen(wr.domain) != domain_len || + strncmp(wr.domain, route_or_domain, domain_len)) + continue; + /* Take into calculation only the workers that are + * not in error state, stopped, disabled or busy. + */ + activation = s->extension.activation ? + s->extension.activation[i] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = wr.activation; + if (JK_WORKER_USABLE(states[wr.i], activation)) { + if (candidate < 0 || wr.distance < d || + (wr.s->lb_value < curmin && + wr.distance == d)) { + candidate = i; + curmin = wr.s->lb_value; + d = wr.distance; + } + } + } + return candidate; +} + + +static int find_best_byvalue(jk_ws_service_t *s, + lb_worker_t *p, + int *states, + jk_logger_t *l) +{ + unsigned int i; + unsigned int j; + unsigned int offset; + int d = 0; + jk_uint64_t curmin = 0; + + /* find the least busy worker */ + int candidate = -1; + int activation; + lb_sub_worker_t wr; + + offset = p->next_offset; + + /* First try to see if we have available candidate */ + for (j = offset; j < p->num_of_workers + offset; j++) { + i = j % p->num_of_workers; + wr = p->lb_workers[i]; + activation = s->extension.activation ? + s->extension.activation[i] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = wr.activation; + + /* Take into calculation only the workers that are + * not in error state, stopped, disabled or busy. + */ + if (JK_WORKER_USABLE(states[wr.i], activation)) { + if (candidate < 0 || wr.distance < d || + (wr.s->lb_value < curmin && + wr.distance == d)) { + candidate = i; + curmin = wr.s->lb_value; + d = wr.distance; + p->next_offset = i + 1; + } + } + } + return candidate; +} + +static int find_bysession_route(jk_ws_service_t *s, + lb_worker_t *p, + const char *session_route, + int *states, + jk_logger_t *l) +{ + int uses_domain = 0; + int candidate = -1; + + candidate = find_by_session(s, p, session_route, l); + if (candidate < 0) { + uses_domain = 1; + candidate = find_best_bydomain(s, p, session_route, states, l); + } + if (candidate >= 0) { + lb_sub_worker_t wr = p->lb_workers[candidate]; + int activation; + if (uses_domain) + s->route = wr.domain; + activation = s->extension.activation ? + s->extension.activation[candidate] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = wr.activation; + if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation)) { + /* We have a worker that is error state or stopped. + * If it has a redirection set use that redirection worker. + * This enables to safely remove the member from the + * balancer. Of course you will need a some kind of + * session replication between those two remote. + */ + if (p->sticky_session_force) + candidate = -1; + else if (*wr.redirect) { + candidate = find_by_session(s, p, wr.redirect, l); + s->route = NULL; + } + else if (*wr.domain && !uses_domain) { + candidate = find_best_bydomain(s, p, wr.domain, states, l); + if (candidate >= 0) { + s->route = wr.domain; + } else { + s->route = NULL; + } + } + if (candidate >= 0) { + wr = p->lb_workers[candidate]; + activation = s->extension.activation ? + s->extension.activation[candidate] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = wr.activation; + if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation)) + candidate = -1; + } + } + } + return candidate; +} + +static int find_failover_worker(jk_ws_service_t *s, + lb_worker_t *p, + int *states, + jk_logger_t *l) +{ + int rc = -1; + unsigned int i; + char *redirect = NULL; + + for (i = 0; i < p->num_of_workers; i++) { + if (strlen(p->lb_workers[i].redirect)) { + redirect = p->lb_workers[i].redirect; + break; + } + } + if (redirect) + rc = find_bysession_route(s, p, redirect, states, l); + return rc; +} + +static int find_best_worker(jk_ws_service_t *s, + lb_worker_t *p, + int *states, + jk_logger_t *l) +{ + int rc = -1; + + rc = find_best_byvalue(s, p, states, l); + /* By default use worker route as session route */ + if (rc < 0) + rc = find_failover_worker(s, p, states, l); + return rc; +} + +static int get_most_suitable_worker(jk_ws_service_t *s, + lb_worker_t *p, + char *sessionid, + int *states, + jk_logger_t *l) +{ + int rc = -1; + int r; + + JK_TRACE_ENTER(l); + if (p->num_of_workers == 1) { + /* No need to find the best worker + * if there is a single one + */ + int activation = s->extension.activation ? + s->extension.activation[0] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = p->lb_workers[0].activation; + if (JK_WORKER_USABLE_STICKY(states[0], activation)) { + if (activation != JK_LB_ACTIVATION_DISABLED) { + JK_TRACE_EXIT(l); + return 0; + } + } + else { + JK_TRACE_EXIT(l); + return -1; + } + } + if (p->lblock == JK_LB_LOCK_PESSIMISTIC) + r = jk_shm_lock(); + else { + JK_ENTER_CS(&(p->cs), r); + } + if (!r) { + jk_log(l, JK_LOG_ERROR, + "locking failed (errno=%d)", + errno); + } + if (sessionid) { + char *session = sessionid; + while (sessionid) { + char *next = strchr(sessionid, ';'); + char *session_route = NULL; + if (next) + *next++ = '\0'; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "searching worker for partial sessionid %s", + sessionid); + session_route = strchr(sessionid, '.'); + if (session_route) { + ++session_route; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "searching worker for session route %s", + session_route); + + /* We have a session route. Whow! */ + rc = find_bysession_route(s, p, session_route, states, l); + if (rc >= 0) { + lb_sub_worker_t *wr = &(p->lb_workers[rc]); + if (p->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + else { + JK_LEAVE_CS(&(p->cs), r); + } + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "found worker %s (%s) for route %s and partial sessionid %s", + wr->name, wr->route, session_route, sessionid); + JK_TRACE_EXIT(l); + return rc; + } + } + /* Try next partial sessionid if present */ + sessionid = next; + rc = -1; + } + if (rc < 0 && p->sticky_session_force) { + if (p->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + else { + JK_LEAVE_CS(&(p->cs), r); + } + jk_log(l, JK_LOG_INFO, + "all workers are in error state for session %s", + session); + JK_TRACE_EXIT(l); + return -1; + } + } + rc = find_best_worker(s, p, states, l); + if (p->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + else { + JK_LEAVE_CS(&(p->cs), r); + } + if (rc >= 0) { + lb_sub_worker_t *wr = &(p->lb_workers[rc]); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "found best worker %s (%s) using method '%s'", + wr->name, wr->route, jk_lb_get_method(p, l)); + JK_TRACE_EXIT(l); + return rc; + } + JK_TRACE_EXIT(l); + return -1; +} + +static void lb_add_log_items(jk_ws_service_t *s, + const char *const *log_names, + lb_sub_worker_t *w, + jk_logger_t *l) +{ + ajp_worker_t *aw = (ajp_worker_t *)w->worker->worker_private; + const char **log_values = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT); + char *buf = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT * JK_LB_UINT64_STR_SZ); + if (log_values && buf) { + /* JK_NOTE_LB_FIRST/LAST_NAME */ + log_values[0] = w->name; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->lb_value); + /* JK_NOTE_LB_FIRST/LAST_VALUE */ + log_values[1] = buf; + buf += JK_LB_UINT64_STR_SZ; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->used); + /* JK_NOTE_LB_FIRST/LAST_ACCESSED */ + log_values[2] = buf; + buf += JK_LB_UINT64_STR_SZ; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->readed); + /* JK_NOTE_LB_FIRST/LAST_READ */ + log_values[3] = buf; + buf += JK_LB_UINT64_STR_SZ; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->transferred); + /* JK_NOTE_LB_FIRST/LAST_TRANSFERRED */ + log_values[4] = buf; + buf += JK_LB_UINT64_STR_SZ; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT32_T_FMT, w->s->errors); + /* JK_NOTE_LB_FIRST/LAST_ERRORS */ + log_values[5] = buf; + buf += JK_LB_UINT64_STR_SZ; + snprintf(buf, JK_LB_UINT64_STR_SZ, "%d", aw->s->busy); + /* JK_NOTE_LB_FIRST/LAST_BUSY */ + log_values[6] = buf; + /* JK_NOTE_LB_FIRST/LAST_ACTIVATION */ + log_values[7] = jk_lb_get_activation(w, l); + /* JK_NOTE_LB_FIRST/LAST_STATE */ + log_values[8] = jk_lb_get_state(w, l); + s->add_log_items(s, log_names, log_values, JK_LB_NOTES_COUNT); + } +} + +static int JK_METHOD service(jk_endpoint_t *e, + jk_ws_service_t *s, + jk_logger_t *l, int *is_error) +{ + lb_endpoint_t *p; + int attempt = 0; + lb_sub_worker_t *prec = NULL; + int num_of_workers; + int first = 1; + int was_forced = 0; + int recoverable = JK_TRUE; + int rc = JK_UNSET; + char *sessionid = NULL; + int i; + int retry = 0; + + JK_TRACE_ENTER(l); + + if (!e || !e->endpoint_private || !s || !is_error) { + JK_LOG_NULL_PARAMS(l); + if (is_error) + *is_error = JK_HTTP_SERVER_ERROR; + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + p = e->endpoint_private; + num_of_workers = p->worker->num_of_workers; + + /* Set returned error to OK */ + *is_error = JK_HTTP_OK; + + if (p->worker->sequence != p->worker->s->h.sequence) + jk_lb_pull(p->worker, JK_FALSE, l); + for (i = 0; i < num_of_workers; i++) { + lb_sub_worker_t *rec = &(p->worker->lb_workers[i]); + if (rec->s->state == JK_LB_STATE_BUSY) { + if (ajp_has_endpoint(rec->worker, l)) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s busy count fixed", + rec->name); + rec->s->state = JK_LB_STATE_OK; + } + } + /* Copy the shared state info */ + p->states[i] = rec->s->state; + } + + /* set the recovery post, for LB mode */ + s->reco_buf = jk_b_new(s->pool); + if (!s->reco_buf) { + *is_error = JK_HTTP_SERVER_ERROR; + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) { + *is_error = JK_HTTP_SERVER_ERROR; + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message buffer"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + jk_b_reset(s->reco_buf); + s->reco_status = RECO_INITED; + + if (p->worker->sticky_session) { + /* Use sessionid only if sticky_session is + * defined for this load balancer + */ + sessionid = get_sessionid(s, p->worker, l); + } + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "service sticky_session=%d id='%s'", + p->worker->sticky_session, sessionid ? sessionid : "empty"); + + while (recoverable == JK_TRUE) { + if (attempt >= num_of_workers) { + retry++; + if (retry >= p->worker->retries) { + /* Done with retrying */ + break; + } + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "retry %d, sleeping for %d ms before retrying", + retry, p->worker->retry_interval); + jk_sleep(p->worker->retry_interval); + /* Pull shared memory if something changed during sleep */ + if (p->worker->sequence != p->worker->s->h.sequence) + jk_lb_pull(p->worker, JK_FALSE, l); + for (i = 0; i < num_of_workers; i++) { + /* Copy the shared state info */ + p->states[i] = p->worker->lb_workers[i].s->state; + } + attempt = 0; + } + rc = JK_FALSE; + *is_error = JK_HTTP_SERVER_BUSY; + i = get_most_suitable_worker(s, p->worker, sessionid, p->states, l); + if (i >= 0) { + int r; + int is_service_error = JK_HTTP_OK; + lb_sub_worker_t *rec = &(p->worker->lb_workers[i]); + ajp_worker_t *aw = (ajp_worker_t *)rec->worker->worker_private; + jk_endpoint_t *end = NULL; + int activation = s->extension.activation ? + s->extension.activation[i] : + JK_LB_ACTIVATION_UNSET; + if (activation == JK_LB_ACTIVATION_UNSET) + activation = rec->activation; + if (!s->route) + s->route = rec->route; + s->activation = jk_lb_get_activation_direct(activation, l); + prec = rec; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "service worker=%s route=%s", + rec->name, s->route); + + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_lock(); + if (rec->s->state == JK_LB_STATE_RECOVER) { + rec->s->state = JK_LB_STATE_PROBE; + p->states[rec->i] = JK_LB_STATE_PROBE; + } + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + + r = rec->worker->get_endpoint(rec->worker, &end, l); + if (!r || !end) { + /* If we can not get the endpoint + * mark the worker as busy rather then + * as in error if the retry number is + * greater then the number of retries. + */ + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_lock(); + if (rec->s->state != JK_LB_STATE_ERROR) { + rec->s->state = JK_LB_STATE_BUSY; + p->states[rec->i] = JK_LB_STATE_BUSY; + } + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + jk_log(l, JK_LOG_INFO, + "could not get free endpoint for worker %s (%d retries)", + rec->name, retry); + } + else { + int service_stat = JK_UNSET; + jk_uint64_t rd = 0; + jk_uint64_t wr = 0; + /* Reset endpoint read and write sizes for + * this request. + */ + end->rd = end->wr = 0; + end->recoverable = JK_TRUE; + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_lock(); + + /* Increment the number of workers serving request */ + p->worker->s->busy++; + rec->s->busy++; + if (p->worker->s->busy > p->worker->s->max_busy) + p->worker->s->max_busy = p->worker->s->busy; + if ( (p->worker->lbmethod == JK_LB_METHOD_REQUESTS) || + (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) || + (p->worker->lbmethod == JK_LB_METHOD_SESSIONS && + !sessionid) ) + rec->s->lb_value += rec->lb_mult; + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + + service_stat = end->service(end, s, l, &is_service_error); + rd = end->rd; + wr = end->wr; + recoverable = end->recoverable; + *is_error = is_service_error; + end->done(&end, l); + + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_lock(); + + /* Update partial reads and writes if any */ + if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) { + rec->s->lb_value += (rd+wr)*rec->lb_mult; + } + else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) { + if (rec->s->lb_value >= rec->lb_mult) { + rec->s->lb_value -= rec->lb_mult; + } + else { + rec->s->lb_value = 0; + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "worker %s has load value to low (%" + JK_UINT64_T_FMT + " < %" + JK_UINT64_T_FMT + ") ", + "- correcting to 0", + rec->name, + rec->s->lb_value, + rec->lb_mult); + } + } + } + + /* When have an endpoint and we ran a request, assume + * we are OK, unless we last were in error. + * We will below explicitely set OK or ERROR according + * to the returned service_stat. + */ + if (rec->s->state != JK_LB_STATE_ERROR) { + rec->s->state = JK_LB_STATE_OK; + p->states[rec->i] = JK_LB_STATE_OK; + } + /* Decrement the busy worker count. + * Check if the busy was reset to zero by graceful + * restart of the server. + */ + if (p->worker->s->busy) + p->worker->s->busy--; + if (rec->s->busy) + rec->s->busy--; + if (service_stat == JK_TRUE) { + /* + * Successful request. + */ + rec->s->state = JK_LB_STATE_OK; + p->states[rec->i] = JK_LB_STATE_OK; + rec->s->error_time = 0; + rc = JK_TRUE; + recoverable = JK_UNSET; + } + else if (service_stat == JK_CLIENT_ERROR) { + /* + * Client error !!! + * Since this is bad request do not fail over. + */ + rec->s->state = JK_LB_STATE_OK; + p->states[rec->i] = JK_LB_STATE_ERROR; + rec->s->error_time = 0; + rc = JK_CLIENT_ERROR; + recoverable = JK_FALSE; + } + else if (service_stat == JK_SERVER_ERROR) { + /* + * Internal JK server error. + * Keep previous global state. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + p->states[rec->i] = JK_LB_STATE_ERROR; + rc = JK_FALSE; + } + else if (service_stat == JK_AJP_PROTOCOL_ERROR) { + /* + * We've received a bad AJP message from the backend. + * Keep previous global state. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + p->states[rec->i] = JK_LB_STATE_ERROR; + rc = JK_FALSE; + } + else if (service_stat == JK_STATUS_ERROR) { + /* + * Status code configured as service is down. + * The node is fine. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + rec->s->state = JK_LB_STATE_OK; + p->states[rec->i] = JK_LB_STATE_ERROR; + rec->s->error_time = 0; + rc = JK_FALSE; + } + else if (service_stat == JK_STATUS_FATAL_ERROR) { + /* + * Status code configured as service is down. + * Mark the node as bad. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + rec->s->errors++; + rec->s->state = JK_LB_STATE_ERROR; + p->states[rec->i] = JK_LB_STATE_ERROR; + rec->s->error_time = time(NULL); + rc = JK_FALSE; + } + else if (service_stat == JK_REPLY_TIMEOUT) { + if (aw->s->reply_timeouts > (unsigned)p->worker->max_reply_timeouts) { + /* + * Service failed - to many reply timeouts + * Mark the node as bad. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + rec->s->errors++; + rec->s->state = JK_LB_STATE_ERROR; + p->states[rec->i] = JK_LB_STATE_ERROR; + rec->s->error_time = time(NULL); + } + else { + /* + * Reply timeout, bot not yet too many of them. + * Keep previous global state. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + p->states[rec->i] = JK_LB_STATE_ERROR; + } + rc = JK_FALSE; + } + else { + /* + * Various unspecific error cases. + * Keep previous global state, if we are not in local error since to long. + * Do not try to reuse the same node for the same request. + * Failing over to another node could help. + */ + time_t now = time(NULL); + rec->s->errors++; + if (rec->s->busy == 0 || + p->worker->error_escalation_time == 0 || + (rec->s->error_time > 0 && + (int)difftime(now, rec->s->error_time) >= p->worker->error_escalation_time)) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s escalating local error to global error", + rec->name); + rec->s->state = JK_LB_STATE_ERROR; + } + p->states[rec->i] = JK_LB_STATE_ERROR; + if (rec->s->error_time == 0) { + rec->s->error_time = now; + } + rc = JK_FALSE; + } + if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC) + jk_shm_unlock(); + if (p->states[rec->i] == JK_LB_STATE_ERROR) + jk_log(l, JK_LOG_INFO, + "service failed, worker %s is in %serror state", + rec->name, + rec->s->state == JK_LB_STATE_ERROR ? "" : "local "); + } + if (recoverable == JK_TRUE) { + /* + * Error is recoverable by submitting the request to + * another worker... Lets try to do that. + */ + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "recoverable error... will try to recover on other worker"); + } + else { + /* + * Error is not recoverable - break with an error. + */ + if (rc == JK_CLIENT_ERROR) + jk_log(l, JK_LOG_INFO, + "unrecoverable error %d, request failed." + " Client failed in the middle of request," + " we can't recover to another instance.", + *is_error); + else if (rc != JK_TRUE) + jk_log(l, JK_LOG_ERROR, + "unrecoverable error %d, request failed." + " Tomcat failed in the middle of request," + " we can't recover to another instance.", + *is_error); + } + if (first == 1 && s->add_log_items) { + first = 0; + lb_add_log_items(s, lb_first_log_names, prec, l); + } + } + else { + /* No more workers left ... */ + if (!was_forced) { + int nf; + /* Force recovery only once. + * If it still fails, Tomcat is still disconnected. + */ + jk_shm_lock(); + nf = force_recovery(p->worker, p->states, l); + jk_shm_unlock(); + was_forced = 1; + if (nf) { + /* We have forced recovery. + * Reset the service loop and go again + */ + prec = NULL; + jk_log(l, JK_LOG_INFO, + "Forcing recovery once for %d workers", nf); + continue; + } + else { + /* No workers in error state. + * Somebody set them all to disabled? + */ + jk_log(l, JK_LOG_INFO, + "All tomcat instances failed, no more workers " + "left for recovery (attempt=%d, retry=%d)", + attempt, retry); + *is_error = JK_HTTP_SERVER_BUSY; + rc = JK_FALSE; + } + } + else { + jk_log(l, JK_LOG_INFO, + "All tomcat instances failed, no more workers " + "left (attempt=%d, retry=%d)", + attempt, retry); + *is_error = JK_HTTP_SERVER_BUSY; + rc = JK_FALSE; + } + } + attempt++; + } + if (recoverable == JK_TRUE) { + jk_log(l, JK_LOG_INFO, + "All tomcat instances are busy or in error state"); + /* rc and http error must be set above */ + } + if (rc == JK_FALSE) { + jk_log(l, JK_LOG_ERROR, + "All tomcat instances failed, no more workers left"); + } + if (prec && s->add_log_items) { + lb_add_log_items(s, lb_last_log_names, prec, l); + } + + JK_TRACE_EXIT(l); + return rc; +} + +static int JK_METHOD done(jk_endpoint_t **e, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (e && *e && (*e)->endpoint_private) { + lb_endpoint_t *p = (*e)->endpoint_private; + free(p->states); + free(p); + *e = NULL; + JK_TRACE_EXIT(l); + return JK_TRUE; + } + + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +static int JK_METHOD validate(jk_worker_t *pThis, + jk_map_t *props, + jk_worker_env_t *we, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (pThis && pThis->worker_private) { + lb_worker_t *p = pThis->worker_private; + char **worker_names; + unsigned int num_of_workers; + const char *secret; + + p->sticky_session = jk_get_is_sticky_session(props, p->name); + p->sticky_session_force = jk_get_is_sticky_session_force(props, p->name); + secret = jk_get_worker_secret(props, p->name); + + if (jk_get_lb_worker_list(props, + p->name, + &worker_names, + &num_of_workers) && num_of_workers) { + unsigned int i = 0; + unsigned int j = 0; + p->max_packet_size = DEF_BUFFER_SZ; + p->lb_workers = jk_pool_alloc(&p->p, + num_of_workers * + sizeof(lb_sub_worker_t)); + if (!p->lb_workers) { + JK_TRACE_EXIT(l); + return JK_FALSE; + } + memset(p->lb_workers, 0, num_of_workers * sizeof(lb_sub_worker_t)); + for (i = 0; i < num_of_workers; i++) { + p->lb_workers[i].s = jk_shm_alloc_lb_sub_worker(&p->p); + if (p->lb_workers[i].s == NULL) { + jk_log(l, JK_LOG_ERROR, + "allocating lb sub worker record from shared memory"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + for (i = 0; i < num_of_workers; i++) { + const char *s; + unsigned int ms; + + p->lb_workers[i].i = i; + strncpy(p->lb_workers[i].name, worker_names[i], + JK_SHM_STR_SIZ); + strncpy(p->lb_workers[i].s->h.name, worker_names[i], + JK_SHM_STR_SIZ); + p->lb_workers[i].sequence = 0; + p->lb_workers[i].s->h.sequence = 0; + p->lb_workers[i].lb_factor = + jk_get_lb_factor(props, worker_names[i]); + if (p->lb_workers[i].lb_factor < 1) { + p->lb_workers[i].lb_factor = 1; + } + /* Calculate the maximum packet size from all workers + * for the recovery buffer. + */ + ms = jk_get_max_packet_size(props, worker_names[i]); + if (ms > p->max_packet_size) + p->max_packet_size = ms; + p->lb_workers[i].distance = + jk_get_distance(props, worker_names[i]); + if ((s = jk_get_worker_route(props, worker_names[i], NULL))) + strncpy(p->lb_workers[i].route, s, JK_SHM_STR_SIZ); + else + strncpy(p->lb_workers[i].route, worker_names[i], JK_SHM_STR_SIZ); + if ((s = jk_get_worker_domain(props, worker_names[i], NULL))) + strncpy(p->lb_workers[i].domain, s, JK_SHM_STR_SIZ); + if ((s = jk_get_worker_redirect(props, worker_names[i], NULL))) + strncpy(p->lb_workers[i].redirect, s, JK_SHM_STR_SIZ); + + p->lb_workers[i].s->lb_value = 0; + p->lb_workers[i].s->state = JK_LB_STATE_IDLE; + p->lb_workers[i].s->error_time = 0; + p->lb_workers[i].activation = + jk_get_worker_activation(props, worker_names[i]); + if (!wc_create_worker(p->lb_workers[i].name, 0, + props, + &(p->lb_workers[i].worker), + we, l) || !p->lb_workers[i].worker) { + break; + } + if (secret && (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE || + p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE)) { + ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private; + if (!aw->secret) + aw->secret = secret; + } + if (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE || + p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE) { + ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private; + if (aw->port == 0) { + p->lb_workers[i].activation = JK_LB_ACTIVATION_STOPPED; + } + } + } + + if (i != num_of_workers) { + jk_log(l, JK_LOG_ERROR, + "Failed creating worker %s", + p->lb_workers[i].name); + close_workers(p, i, l); + } + else { + /* Update domain names if route contains period '.' */ + for (i = 0; i < num_of_workers; i++) { + if (!p->lb_workers[i].domain[0]) { + char *id_domain = strchr(p->lb_workers[i].route, '.'); + if (id_domain) { + *id_domain = '\0'; + strcpy(p->lb_workers[i].domain, p->lb_workers[i].route); + *id_domain = '.'; + } + } + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "Balanced worker %i has name %s and route %s in domain %s", + i, + p->lb_workers[i].name, + p->lb_workers[i].route, + p->lb_workers[i].domain); + } + } + p->num_of_workers = num_of_workers; + update_mult(p, l); + for (i = 0; i < num_of_workers; i++) { + for (j = 0; j < i; j++) { + if (strcmp(p->lb_workers[i].route, p->lb_workers[j].route) == 0) { + jk_log(l, JK_LOG_ERROR, + "Balanced workers number %i (%s) and %i (%s) share the same route %s - aborting configuration!", + i, + p->lb_workers[i].name, + j, + p->lb_workers[j].name, + p->lb_workers[i].route); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + } + JK_TRACE_EXIT(l); + return JK_TRUE; + } + } + } + + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +static int JK_METHOD init(jk_worker_t *pThis, + jk_map_t *props, + jk_worker_env_t *we, jk_logger_t *log) +{ + int i; + + lb_worker_t *p = (lb_worker_t *)pThis->worker_private; + JK_TRACE_ENTER(log); + + p->worker.we = we; + p->retries = jk_get_worker_retries(props, p->name, + JK_RETRIES); + p->retry_interval = + jk_get_worker_retry_interval(props, p->name, + JK_SLEEP_DEF); + p->recover_wait_time = jk_get_worker_recover_timeout(props, p->name, + WAIT_BEFORE_RECOVER); + if (p->recover_wait_time < 1) + p->recover_wait_time = 1; + p->error_escalation_time = jk_get_worker_error_escalation_time(props, p->name, + p->recover_wait_time / 2); + p->max_reply_timeouts = jk_get_worker_max_reply_timeouts(props, p->name, + 0); + p->maintain_time = jk_get_worker_maintain_time(props); + if(p->maintain_time < 0) + p->maintain_time = 0; + p->s->last_maintain_time = time(NULL); + p->s->last_reset = p->s->last_maintain_time; + + p->lbmethod = jk_get_lb_method(props, p->name); + p->lblock = jk_get_lb_lock(props, p->name); + strncpy(p->session_cookie, + jk_get_lb_session_cookie(props, p->name, JK_SESSION_IDENTIFIER), + JK_SHM_STR_SIZ); + strncpy(p->session_path, + jk_get_lb_session_path(props, p->name, JK_PATH_SESSION_IDENTIFIER), + JK_SHM_STR_SIZ); + strcpy(p->s->session_cookie, p->session_cookie); + strcpy(p->s->session_path, p->session_path); + + JK_INIT_CS(&(p->cs), i); + if (i == JK_FALSE) { + jk_log(log, JK_LOG_ERROR, + "creating thread lock (errno=%d)", + errno); + JK_TRACE_EXIT(log); + return JK_FALSE; + } + + p->sequence++; + jk_lb_push(p, JK_FALSE, log); + + JK_TRACE_EXIT(log); + return JK_TRUE; +} + +static int JK_METHOD get_endpoint(jk_worker_t *pThis, + jk_endpoint_t **pend, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (pThis && pThis->worker_private && pend) { + lb_endpoint_t *p = (lb_endpoint_t *) malloc(sizeof(lb_endpoint_t)); + p->worker = pThis->worker_private; + p->endpoint.endpoint_private = p; + p->endpoint.service = service; + p->endpoint.done = done; + p->states = (int *)malloc((p->worker->num_of_workers + 1) * sizeof(int)); + if (!p->states) { + free(p); + jk_log(l, JK_LOG_ERROR, + "Failed allocating private worker state memory"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + *pend = &p->endpoint; + JK_TRACE_EXIT(l); + return JK_TRUE; + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +static int JK_METHOD destroy(jk_worker_t **pThis, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (pThis && *pThis && (*pThis)->worker_private) { + unsigned int i; + lb_worker_t *private_data = (*pThis)->worker_private; + + close_workers(private_data, private_data->num_of_workers, l); + JK_DELETE_CS(&(private_data->cs), i); + jk_close_pool(&private_data->p); + free(private_data); + + JK_TRACE_EXIT(l); + return JK_TRUE; + } + + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +int JK_METHOD lb_worker_factory(jk_worker_t **w, + const char *name, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (NULL != name && NULL != w) { + lb_worker_t *private_data = + (lb_worker_t *) calloc(1, sizeof(lb_worker_t)); + + + jk_open_pool(&private_data->p, + private_data->buf, + sizeof(jk_pool_atom_t) * TINY_POOL_SIZE); + + private_data->s = jk_shm_alloc_lb_worker(&private_data->p); + if (!private_data->s) { + free(private_data); + JK_TRACE_EXIT(l); + return 0; + } + strncpy(private_data->name, name, JK_SHM_STR_SIZ); + strncpy(private_data->s->h.name, name, JK_SHM_STR_SIZ); + private_data->lb_workers = NULL; + private_data->num_of_workers = 0; + private_data->worker.worker_private = private_data; + private_data->worker.validate = validate; + private_data->worker.init = init; + private_data->worker.get_endpoint = get_endpoint; + private_data->worker.destroy = destroy; + private_data->worker.maintain = maintain_workers; + private_data->recover_wait_time = WAIT_BEFORE_RECOVER; + private_data->error_escalation_time = private_data->recover_wait_time / 2; + private_data->max_reply_timeouts = 0; + private_data->sequence = 0; + private_data->s->h.sequence = 0; + private_data->next_offset = 0; + *w = &private_data->worker; + JK_TRACE_EXIT(l); + return JK_LB_WORKER_TYPE; + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return 0; +} |