
  Hi Tom,

> Dod you recently send me something re: logresolve? If so, please send it
> again, as I seem to have lost it.

Sure, here it is:


I just started using your logresolve 2.0b8 few months ago and ran into
a small problem. The stuff is great, especially the use of a database
to store lookups. However, I found that once it had to do the DNS
reverse lookup (i.e. the gethostbyaddr()) things got very slow. It
wasn't using CPU time, but just taking a long time. I quickly found
out that the problem was that the reverse lookups tend to take about
1 second, and since the lookups are done one at a time this slows
things way down.

So, I modified logresolve to spawn a (configurable) number of processes
and have them do the lookups in parallel. I typically use 20 processes
and get a linear speedup - instead of taking 30 hours to complete things
are over in something over an hour. There is one caveat: the order of
the lines in the log may change because resolving some addresses is
faster others, and some may be in the database. However, I don't think
this should be a problem as logresolve is probably used mostly before
some statistics are done and there it does not matter what the order
is.

Since I thought others my find this useful I was wondering if you were
interested in the changes? It's basically two new files, "lr-procs.c"
and "lr-procs.h", and a change to the main loop in logresolve.c . In
case you are, I've attached the new files and the diffs to this mail.
You're free to do whatever you like with it. If you have questions
about it (or just want a more detailed explanation of how I did things)
please let me know.


  Cheers,

  Ronald


----- lr-procs.c ---------------------------------------------------------
/***** **** *** ** *  *   *    *     *       *     *   *  * ** *** **** *****\

    logresolve 2.0 - http://www.net/~tomr/progs/logresolve/

    lr-procs.c - resolver processes

    Tom Rathborne - tomr@uunet.ca - http://www.net/~tomr/
    Ronald Tschalär - ronald@innovation.ch

\***** **** *** ** *  *   *    *     *       *     *   *  * ** *** **** *****/

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <netdb.h>

#include "lr-procs.h"
#include "lr-ip.h"


#define	MAX_QUEUE_LEN	10
#define	T_ADDR		1
#define	T_REC		2


static int num_procs;
static proc_rec *proc_list;


static void die(const char *fmt, ...)
{
    va_list ap;

    va_start(ap, fmt);
    vfprintf(stderr, fmt, ap);
    va_end(ap);
    fputs("\n", stderr);

    exit(1);
}


static int read_fully(FILE *fp, void *buf, size_t len)
{
    int tot = 0, got;

    while (tot < len)
    {
	got = fread(((char*) buf)+tot, 1, len-tot, fp);
	if (got <= 0)  return FALSE;
	tot += got;
    }

    return TRUE;
}


static void resolver(int pipe)
{
    FILE *in, *out;
    unsigned char addr[4], type;
    struct nsrec  rec, *p_rec = NULL;
    size_t h_len;


    in = fdopen(pipe, "r");
    out = fdopen(pipe, "w");
    if (in == NULL  ||  out == NULL)
	die("resolver: failed to create streams for pipe: %s", strerror(errno));

    sethostent(TRUE);

    while (fread(&type, 1, 1, in) > 0)
    {
	switch(type)
	{
	    case T_ADDR:
		if (!read_fully(in, addr, sizeof(addr)))
		    die("resolver: error reading address from pipe: %s", strerror(errno));
		p_rec = ip_lookup(addr);
		break;
	    case T_REC:
		if (read_fully(in, &rec, sizeof(rec)))
		    die("resolver: error reading record from pipe: %s", strerror(errno));
		rec.hostname = NULL;
		p_rec = ip_relookup(&rec);
		break;
	    default:
		die("resolver: internal error - received invalid type %d", (int) type);
	}

	h_len = strlen(p_rec->hostname)+1;
	if (fwrite(&type, 1, 1, out) != 1  ||
	    fwrite(p_rec, 1, sizeof(*p_rec), out) != sizeof(*p_rec)  ||
	    fwrite(&h_len, 1, sizeof(h_len), out) != sizeof(h_len)  ||
	    fwrite(p_rec->hostname, 1, h_len, out) != h_len  ||
	    fflush(out) != 0)
	    die("resolver: error writing record to pipe: %s", strerror(errno));

	free(p_rec->hostname);
	free(p_rec);
    }

    endhostent();

    exit(0);
}


static void fork_proc(proc_rec *proc)
{
    int   fd[2];
    pid_t pid;


    if (pipe(fd) == -1)
	die("error creating pipe: %s", strerror(errno));

    switch(pid = fork())
    {
	case -1:	/* error */
	    die("failed to fork resolver process: %s", strerror(errno));
	case 0:		/* child */
	    close(fd[1]);
	    resolver(fd[0]);	/* never returns */
	default:	/* parent */
	    proc->pid        = pid;
	    close(fd[0]);
	    proc->pipe       = fd[1];
	    proc->queue      = fdopen(fd[1], "w");
	    proc->dequeue    = fdopen(fd[1], "r");
	    if (proc->queue == NULL  ||  proc->dequeue == NULL)
		die("main: failed to create streams for pipe: %s", strerror(errno));
	    /* turn off buffering because we can't use select otherwise */
	    setvbuf(proc->dequeue, NULL, _IONBF, 0);
	    proc->head       = NULL;
	    proc->tail       = NULL;
	    proc->list_len   = 0;
    }
}


/*
 * create num processes, setting up all the pipes etc
 */
void create_procs(int num)
{
    int idx;

    if (num < 1)  return;

    proc_list = (proc_rec*) calloc(num, sizeof(proc_rec));
    if (proc_list == NULL)
	die("failed to malloc process records");
    num_procs = num;

    signal(SIGCHLD, SIG_IGN);	/* prevent zombie's */
    atexit(end_procs);
    for (idx=0; idx<num_procs; idx++)
	fork_proc(&proc_list[idx]);
}

/*
 * end processes by closing all pipes.
 */
void end_procs(void)
{
    int idx;

    for (idx=0; idx<num_procs; idx++)
	if (proc_list[idx].pid > 0)
	{
	    fclose(proc_list[idx].queue);
	    fclose(proc_list[idx].dequeue);
	    proc_list[idx].pid = 0;
	}
}


static void add_line(entry_rec *ent, const char *line)
{
    line_elem *l = (line_elem *) malloc(sizeof(line_elem));
    if (l == NULL)
	die("failed to malloc log line list entry");

    l->line = (char *) malloc(strlen(line)+1);
    if (l->line == NULL)
	die("failed to malloc log line copy");
    strcpy(l->line, line);

    l->next = ent->lines;
    ent->lines = l;
}


/*
 * queue an address for resolving. If the address is already queued the line
 * is just added to the list of lines to be processed when the address is
 * resolved. Else if the address is not already queued and some process has
 * not reached its maximum queue length then the address is queued for
 * resolving by that process. In either of these cases the function returns
 * TRUE. If the address could not be queued because all processes have reached
 * their maximum queue lengths then the function just returns FALSE;
 */
int queue_addr(const unsigned char addr[], struct nsrec *record,
	       const char *line)
{
    int idx, min_idx = 0;

    for (idx=0; idx<num_procs; idx++)
    {
	entry_rec *ent = proc_list[idx].head;
	int  num_ent   = proc_list[idx].list_len;

	/* check if the resolver is already resolving this address */
	while (num_ent-- > 0)
	{
	    if (!memcmp(addr, ent->addr, sizeof(ent->addr)))
	    {
		add_line(ent, line);
		return TRUE;
	    }

	    ent = ent->next;
	}

	/* update the pointer to the shortest queue */
	if (proc_list[idx].list_len < proc_list[min_idx].list_len)
	    min_idx = idx;
    }

    /* nobody is resolving this address yet */
    if (proc_list[min_idx].list_len < MAX_QUEUE_LEN)
    {
	/* setup resolver entry */
	entry_rec *ent = (entry_rec *) malloc(sizeof(entry_rec));
	if (ent == NULL)
	    die("failed to malloc resolver entry record");
	memcpy(ent->addr, addr, sizeof(ent->addr));
	ent->next   = NULL;
	ent->record = record;
	ent->lines  = NULL;
	add_line(ent, line);

	/* queue it */
	if (proc_list[min_idx].head == NULL)
	    proc_list[min_idx].head = ent;
	else
	    proc_list[min_idx].tail->next = ent;
	proc_list[min_idx].tail = ent;
	proc_list[min_idx].list_len++;

	if (record != NULL)
	{
	    unsigned char type = T_REC;

	    if (fwrite(&type, 1, 1, proc_list[min_idx].queue) != 1  ||
		fwrite(record, 1, sizeof(*record), proc_list[min_idx].queue) !=
			sizeof(*record)  ||
		fflush(proc_list[min_idx].queue) != 0)
		die("main: error writing record to pipe: %s", strerror(errno));
	}
	else
	{
	    unsigned char type = T_ADDR;

	    if (fwrite(&type, 1, 1, proc_list[min_idx].queue) != 1  ||
		fwrite(addr, 1, sizeof(ent->addr), proc_list[min_idx].queue) !=
			sizeof(ent->addr)  ||
		fflush(proc_list[min_idx].queue) != 0)
		die("main: error writing address to pipe: %s", strerror(errno));
	}

	return TRUE;
    }

    /* no space left in queues */
    return FALSE;
}


/*
 * dequeues a resolved name if any is available and prints all lines for
 * this name to outfile. If block is non-zero and at least one resolver
 * process has something queued then this will block till something is
 * dequeued. If an address lookup was dequeued then record is set to
 * point to the new record, else it's set to NULL. Returns 1 if anything
 * was dequeued, 0 otherwise.
 */
int dequeue_rec_and_print_lines(struct nsrec **record, FILE *outfile, int block)
{
    int idx, max, num;
    fd_set readfds, errfds;
    struct timeval noblock, *tv;


    for (num=0, idx=0; idx<num_procs; idx++)
	num += proc_list[idx].list_len;
    if (num == 0)  return FALSE;

    if (block)
	tv = NULL;
    else
    {
	memset(&noblock, 0, sizeof(noblock));
	tv = &noblock;
    }

    FD_ZERO(&readfds);
    FD_ZERO(&errfds);
    for (max=0, idx=0; idx<num_procs; idx++)
    {
	FD_SET(proc_list[idx].pipe, &readfds);
	FD_SET(proc_list[idx].pipe, &errfds);
	if (proc_list[idx].pipe > max)
	    max = proc_list[idx].pipe;
    }

    num = select(max+1, &readfds, NULL, &errfds, tv);
    if (num == -1)
	die("main: error on select: %s", strerror(errno));
    if (num == 0)
	return FALSE;
    
    for (idx=0; idx<num_procs; idx++)
    {
	if (FD_ISSET(proc_list[idx].pipe, &errfds))
	    die("main: error condition on pipe from process %d", proc_list[idx].pid);
    }

    for (idx=0; idx<num_procs; idx++)
    {
	if (FD_ISSET(proc_list[idx].pipe, &readfds))
	{
	    struct nsrec *nrec = (struct nsrec *) malloc(sizeof(struct nsrec)),
			 *orec;
	    entry_rec *ent;
	    line_elem *l, *n;
	    unsigned char type;
	    size_t h_len;

	    if (fread(&type, 1, 1, proc_list[idx].dequeue) != 1)
		die("main: error reading type from pipe: %s", strerror(errno));
	    if (!read_fully(proc_list[idx].dequeue, nrec, sizeof(*nrec))  ||
		!read_fully(proc_list[idx].dequeue, &h_len, sizeof(h_len))  ||
		(nrec->hostname = malloc(h_len)) == NULL  ||
		!read_fully(proc_list[idx].dequeue, nrec->hostname, h_len))
		die("main: error reading record from pipe: %s", strerror(errno));

	    ent = proc_list[idx].head;
	    proc_list[idx].head = ent->next;
	    if (ent == proc_list[idx].tail)
		proc_list[idx].tail = NULL;
	    proc_list[idx].list_len--;

	    orec = ent->record;

	    if (!(type == T_ADDR  &&  orec == NULL)  &&
		!(type == T_REC   &&  orec != NULL))
		die("main: internal error - received unexpected type %d", (int) type);

	    l = ent->lines;
	    while (l != NULL)
	    {
		fprintf(outfile, "%s %s\n", nrec->hostname, l->line);
		n = l->next;
		free(l->line);
		free(l);
		l = n;
	    }

	    free(ent);

	    switch (type)
	    {
		case T_ADDR:
		    *record = nrec;
		    return TRUE;
		case T_REC:
		    free(ent->record->hostname);
		    orec->hostname   = nrec->hostname;
		    orec->status     = nrec->status;
		    orec->tries      = nrec->tries;
		    orec->lookuptime = nrec->lookuptime;
		    free(nrec);
		    *record = NULL;
		    return TRUE;
		default:
		    die("main: internal error - received invalid type %d", (int) type);
	    }
	}
    }

    /* should never be reached */
    return FALSE;
}


/*** end of lr-nsrec.c ***/
----- lr-procs.h ---------------------------------------------------------
/***** **** *** ** *  *   *    *     *       *     *   *  * ** *** **** *****\

    logresolve 2.0 - http://www.net/~tomr/progs/logresolve/

    lr-procs.h - resolver processes

    Tom Rathborne - tomr@uunet.ca - http://www.net/~tomr/
    Ronald Tschalär - ronald@innovation.ch

\***** **** *** **        *    *     *       *     *   *  * ** *** **** *****/

#ifndef LR_PROCS_H
#define LR_PROCS_H

#include "lr-nsrec.h"
#include "logresolve.h"

/* 
 *  struct line - a log line in a singly linked list
 *
 *  next - next line element in linked list
 *  line - log line
 */
typedef struct line {
    struct line *next;
    char        *line;
} line_elem;

/* 
 *  struct entry - a resolver entry in a singly linked list
 *
 *  next - next entry in linked list
 *  addr - the ip-address to be resolved
 *  record - the record (if any) for this address
 *  lines - pointer to linked list of line elements
 */
typedef struct entry {
    struct entry  *next;
    unsigned char  addr[4];
    struct nsrec  *record;
    line_elem     *lines;
} entry_rec;

/* 
 *  struct proc - a per process record
 *
 *  pid - process id
 *  pipe - file descriptor of our end of pipe
 *  queue - file pointer for queueing resolver requests
 *  dequeue - file pointer for dequeueing resolver results
 *  head - head of singly linked list of queued resolver entries
 *  tail - tail of singly linked list of queued resolver entries
 *  list_len - length of linked list of queued resolver entries
 */
typedef struct proc {
    pid_t      pid;
    int        pipe;
    FILE      *queue;
    FILE      *dequeue;
    entry_rec *head;
    entry_rec *tail;
    unsigned   list_len;
} proc_rec;


/*
 * create num processes, setting up all the pipes etc
 */
void create_procs(int num);

/*
 * end processes by closing all pipes.
 */
void end_procs(void);

/*
 * queue an address for resolving. If the address is already queued the line
 * is just added to the list of lines to be processed when the address is
 * resolved. Else if the address is not already queued and some process has
 * not reached its maximum queue length then the address is queued for
 * resolving by that process. In either of these cases the function returns
 * 1. If the address could not be queued because all processes have reached
 * their maximum queue lengths then the function just returns 0;
 */
int queue_addr(const unsigned char addr[], struct nsrec *record,
	       const char *line);

/*
 * dequeues a resolved name if any is available and prints all lines for
 * this name to outfile. This will not block. Returns the nsrec struct if
 * anything was dequeued, else it returns NULL.
 */
int dequeue_rec_and_print_lines(struct nsrec **record, FILE *outfile, int block);


#endif

/*** end of lr-procs.h ***/
----- context diffs ------------------------------------------------------
*** logresolve.c.orig	Wed Jul 23 08:10:46 1997
--- logresolve.c	Mon Oct 12 23:47:13 1998
***************
*** 24,29 ****
--- 24,30 ----
  #include "logresolve.h"
  
  #include "lr-nsrec.h"
+ #include "lr-procs.h"
  #include "lr-stats.h"
  #include "lr-ip.h"
  #include "lr-hash.h"
***************
*** 104,109 ****
--- 105,111 ----
      opt->stats = LR_STATS;
      opt->buckets = LR_BUCKETS;
      opt->nice = LR_NICE;
+     opt->num_procs = LR_NUM_PROCS;
  
      for (arg = 1; arg < argc; arg++) {
  	option = argv[arg];
***************
*** 177,182 ****
--- 179,190 ----
  		case 'k':
  		    opt->keep = plus;
  		    break;
+ 		case 'p':
+ 		    option++;
+ 		    if ((!*option) && (arg != argc))
+ 			option = argv[++arg];
+ 		    opt->num_procs = atoi(option);
+ 		    break;
  		default:
  		    die_error("Invalid command line option!\n");
  	    }
***************
*** 226,233 ****
      table = hash_new(opt.buckets);
      line = (char *) malloc(MAXLINE);
  
-     sethostent(TRUE);
- 
      if (!strcmp(opt.inname, "-")) {
  	infile = stdin;
      } else {
--- 234,239 ----
***************
*** 244,249 ****
--- 250,257 ----
  	    die_error("Couldn't open output file!\n");
      }
  
+     create_procs(opt.num_procs);
+ 
  /* Begin main loop. */
  
      while (!getline(line, MAXLINE, infile)) {
***************
*** 268,294 ****
  	    chop++;
  	}
  	if (str_ip(line, ipnum)) {
! 	    if (!(record = hash_lookup(table, ipnum))) {
  #ifdef LR_DBM
! 		if (!(record = pdbm_lookup(lrdbm, ipnum)))
  #endif
! 		{
! 		    record = ip_lookup(ipnum);
! 		}
! 		record->usages++;
  
  		hash_insert(table, record);
  	    }
! 	    if ((record->lookuptime < (time(NULL) - opt.expire))
! 		|| ((record->status) && (record->lookuptime < (time(NULL) - opt.retry)))
! 		|| ((record->status == TRY_AGAIN)
! 		    && (record->tries <= opt.tries))) {
! 		ip_relookup(record);
! 	    }
! 	    fprintf(outfile, "%s %s\n", record->hostname, chop);
  
! 	} else {
! 	    fprintf(outfile, "%s %s\n", line, chop);
  	}
      }
  
--- 276,322 ----
  	    chop++;
  	}
  	if (str_ip(line, ipnum)) {
! 
! 	    if ((!(record = hash_lookup(table, ipnum))
  #ifdef LR_DBM
! 		&& !(record = pdbm_lookup(lrdbm, ipnum))
  #endif
! 		)
! 		||
! 		 ((record->lookuptime < (time(NULL) - opt.expire))
! 		 || ((record->status) && (record->lookuptime < (time(NULL) - opt.retry)))
! 		 || ((record->status == TRY_AGAIN)
! 		    && (record->tries <= opt.tries)))
! 		) {
! 		int queued;
! 		do {
! 		    queued = queue_addr(ipnum, NULL, chop);
! 		    if (!queued)
! 			if (dequeue_rec_and_print_lines(&record, outfile, 1)  &&
! 			    record != NULL) {
! 			    record->usages++;
! 			    hash_insert(table, record);
! 			}
! 		} while (!queued);
! 	    } else {
! 		fprintf(outfile, "%s %s\n", record->hostname, chop);
! 	    }
! 	} else {
! 	    fprintf(outfile, "%s %s\n", line, chop);
! 	}
  
+ 	while (dequeue_rec_and_print_lines(&record, outfile, 0)) {
+ 	    if (record != NULL) {
+ 		record->usages++;
  		hash_insert(table, record);
  	    }
! 	}
!     }
  
!     while (dequeue_rec_and_print_lines(&record, outfile, 1)) {
! 	if (record != NULL) {
! 	    record->usages++;
! 	    hash_insert(table, record);
  	}
      }
  
***************
*** 297,303 ****
      fclose(infile);
      fclose(outfile);
  
!     endhostent();
  
  #ifdef LR_DBM
      pdbm_close(lrdbm);
--- 325,331 ----
      fclose(infile);
      fclose(outfile);
  
!     end_procs();
  
  #ifdef LR_DBM
      pdbm_close(lrdbm);
*** logresolve.h.orig	Wed Jul 23 08:10:46 1997
--- logresolve.h	Sun Oct 11 01:11:45 1998
***************
*** 77,82 ****
--- 77,86 ----
  
  #define LR_NICE 1
  
+ /* Default number of processes to start for resolving non-cached host names */
+ 
+ #define LR_NUM_PROCS 20
+ 
  /* maximum line length in bytes, including \0 at the end */
  
  #define MAXLINE 4096
***************
*** 102,107 ****
--- 106,112 ----
      unsigned int count;		/* count - append count? */
      unsigned int buckets;	/* buckets in hash table */
      unsigned int nice;		/* nice level of process */
+     unsigned int num_procs;	/* number of resolver processes */
  };
  
  /* Boolean constants - probably defined elsewhere but hey - what the heck! */
*** logresolve.txt.orig	Wed Jul 23 08:10:46 1997
--- logresolve.txt	Sun Nov 29 22:42:13 1998
***************
*** 55,60 ****
--- 55,61 ----
   -d : <DBM file> to use for persistent cache
   -l : <lockfile> to use to maintain exclusivity on DBM file if necessary.
   -c : <countfile> to *append* total bytecount to
+  -p : <num procs> to use for the reverse address lookups
  
  The ghosts of options yet to be implemented:
  
*** Makefile.orig	Wed Jul 23 08:10:46 1997
--- Makefile	Mon Oct 12 21:03:25 1998
***************
*** 66,72 ****
  ######## You shouldn't have to fiddle with anything below here! ########
  
  TARGETS = logresolve
! SOURCES = logresolve.c lr-dbm.c lr-hash.c lr-ip.c lr-nsrec.c lr-stats.c
  OBJECTS = $(SOURCES:.c=.o)
  DOCS = logresolve.txt
  
--- 66,72 ----
  ######## You shouldn't have to fiddle with anything below here! ########
  
  TARGETS = logresolve
! SOURCES = logresolve.c lr-dbm.c lr-hash.c lr-ip.c lr-nsrec.c lr-stats.c lr-procs.c
  OBJECTS = $(SOURCES:.c=.o)
  DOCS = logresolve.txt
  
***************
*** 74,80 ****
    # in the Makefile are guaranteed to cause a full rebuild, just to be safe.
  
  ALLDEPS = logresolve.h Makefile
! INCLUDES = logresolve.h lr-dbm.h lr-dbm-flock.h lr-hash.h lr-ip.h lr-nsrec.h lr-stats.h
  
  .SUFFIXES: .c .o
  
--- 74,80 ----
    # in the Makefile are guaranteed to cause a full rebuild, just to be safe.
  
  ALLDEPS = logresolve.h Makefile
! INCLUDES = logresolve.h lr-dbm.h lr-dbm-flock.h lr-hash.h lr-ip.h lr-nsrec.h lr-stats.h lr-procs.h
  
  .SUFFIXES: .c .o
  
***************
*** 108,113 ****
--- 108,114 ----
  lr-nsrec.o: lr-nsrec.c lr-nsrec.h
  lr-stats.o: lr-stats.c lr-stats.h
  lr-ip.o: lr-ip.c lr-ip.h lr-nsrec.h
+ lr-procs.o: lr-procs.c lr-procs.h lr-ip.h lr-nsrec.h
  lr-hash.o: lr-hash.c lr-hash.h lr-dbm.h lr-nsrec.h
  lr-dbm.o: lr-dbm.c lr-dbm.h lr-dbm-flock.h lr-nsrec.h
  
--------------------------------------------------------------------------


