runqueue restructuring

This commit is contained in:
Jong Hyuk Choi 2003-05-07 23:52:08 +00:00
parent 580ae073e1
commit 35d1bd68b9
5 changed files with 25 additions and 24 deletions

View file

@ -7,7 +7,8 @@ typedef struct re_s {
struct timeval interval; struct timeval interval;
LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */ LDAP_STAILQ_ENTRY(re_s) tnext; /* it includes running */
LDAP_STAILQ_ENTRY(re_s) rnext; LDAP_STAILQ_ENTRY(re_s) rnext;
void *private; ldap_pvt_thread_start_t *routine;
void *arg;
} re_t; } re_t;
typedef struct runqueue_s { typedef struct runqueue_s {
@ -20,7 +21,8 @@ LDAP_F( void )
ldap_pvt_runqueue_insert( ldap_pvt_runqueue_insert(
struct runqueue_s* rq, struct runqueue_s* rq,
time_t interval, time_t interval,
void *private ldap_pvt_thread_start_t* routine,
void *arg
); );
LDAP_F( void ) LDAP_F( void )

View file

@ -20,7 +20,8 @@ void
ldap_pvt_runqueue_insert( ldap_pvt_runqueue_insert(
struct runqueue_s* rq, struct runqueue_s* rq,
time_t interval, time_t interval,
void *private ldap_pvt_thread_start_t *routine,
void *arg
) )
{ {
struct re_s* entry; struct re_s* entry;
@ -30,7 +31,8 @@ ldap_pvt_runqueue_insert(
entry->interval.tv_usec = 0; entry->interval.tv_usec = 0;
entry->next_sched.tv_sec = time( NULL ); entry->next_sched.tv_sec = time( NULL );
entry->next_sched.tv_usec = 0; entry->next_sched.tv_usec = 0;
entry->private = private; entry->routine = routine;
entry->arg = arg;
LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext ); LDAP_STAILQ_INSERT_HEAD( &rq->task_list, entry, tnext );
} }

View file

@ -365,7 +365,7 @@ int backend_startup(Backend *be)
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo; syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval, ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
(void *) &backendDB[i] ); do_syncrepl, (void *) &backendDB[i] );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
} }
#endif #endif

View file

@ -1297,12 +1297,25 @@ slapd_daemon_task(
at = ldap_pvt_thread_pool_backload(&connection_pool); at = ldap_pvt_thread_pool_backload(&connection_pool);
#ifdef LDAP_SYNCREPL #ifdef LDAP_SYNCREPL
/* cat is of struct timeval containing the earliest schedule */
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex ); ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat ); rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
while ( cat && cat->tv_sec && cat->tv_sec <= now ) {
if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
} else {
ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask );
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
ldap_pvt_thread_pool_submit( &connection_pool,
rtask->routine, (void *) rtask );
}
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
}
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex ); ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
if ( cat != NULL ) { if ( cat != NULL ) {
diff.tv_sec = cat->tv_sec - slap_get_time(); diff.tv_sec = difftime( cat->tv_sec, now );
if ( diff.tv_sec == 0 ) if ( diff.tv_sec == 0 )
diff.tv_sec = tdelta; diff.tv_sec = tdelta;
} }
@ -1386,22 +1399,6 @@ slapd_daemon_task(
0, 0, 0 ); 0, 0, 0 );
#endif #endif
#ifdef LDAP_SYNCREPL
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
rtask = ldap_pvt_runqueue_next_sched( &syncrepl_rq, &cat );
if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
} else if ( cat && cat->tv_sec && cat->tv_sec <= slap_get_time() ) {
ldap_pvt_runqueue_runtask( &syncrepl_rq, rtask );
ldap_pvt_runqueue_resched( &syncrepl_rq, rtask );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
ldap_pvt_thread_pool_submit( &connection_pool,
do_syncrepl, (void *) rtask );
} else {
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
#endif
ldap_pvt_thread_yield(); ldap_pvt_thread_yield();
continue; continue;

View file

@ -119,7 +119,7 @@ do_syncrepl(
void *arg ) void *arg )
{ {
struct re_s* rtask = arg; struct re_s* rtask = arg;
Backend *be = rtask->private; Backend *be = rtask->arg;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo; syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
SlapReply rs = {REP_RESULT}; SlapReply rs = {REP_RESULT};