Logo Search packages:      
Sourcecode: lbzip2 version File versions  Download package

main.c

/* main.c,v 1.51 2009-10-29 01:31:12 lacos Exp */

#include <unistd.h>          /* unlink() */
#include <sys/types.h>       /* kill() */
#include <signal.h>          /* kill() */
#include <stdlib.h>          /* EXIT_FAILURE */
#include <stdarg.h>          /* va_list */
#include <stdio.h>           /* flockfile() */
#include <string.h>          /* strerror() */
#include <errno.h>           /* errno */
#include <assert.h>          /* assert() */
#include <sys/stat.h>        /* stat() */
#include <fcntl.h>           /* open() */

#include "main.h"            /* pname */
#include "lbunzip2_single.h" /* lbunzip2_single_wrap() */
#include "lbunzip2.h"        /* lbunzip2_wrap() */
#include "lbzip2.h"          /* lbzip2_wrap() */


/* Private stuff needed by fatal(). */
static pthread_t main_thread;

/* Pathname of the current regular file being written to. */
static char *opathn;

static pid_t pid;


/* Public utility variables and functions. */
const char *pname;
void *(*mallocf)(size_t size);
void (*freef)(void *ptr);
void *(*lbzallocf)(void *ignored, int n, int m);
void (*lbzfreef)(void *ignored, void *ptr);


/*
  This can be called from any thread, main thread or sub-threads alike, since
  they all call common helper functions that call fatal() in case of an error.
*/
void
fatal(void)
{
  if (pthread_equal(pthread_self(), main_thread)) {
    if (0 != opathn) {
      (void)unlink(opathn);
    }
  }
  else {
    if (0 == kill(pid, SIGUSR1)) {
      pthread_exit(0);
    }
  }
  _exit(EXIT_FAILURE);
}


void
fail(const char *fmt, int err, ...)
{
  va_list args;

  /* Locking stderr should also protect strerror(). */
  flockfile(stderr);
  (void)fprintf(stderr, "%s: ", pname);

  va_start(args, err);
  (void)vfprintf(stderr, fmt, args);
  va_end(args);

  (void)fprintf(stderr, ": %s\n", strerror(err));
  funlockfile(stderr);
  /* Stream stderr is never fully buffered originally. */
  fatal();
}


void *
xalloc(size_t size)
{
  void *ret = (*mallocf)(size);

  if (0 == ret) {
    fail("(*mallocf)()", errno);
  }

  return ret;
}


void
xinit(struct cond *cond)
{
  int ret;
  pthread_mutexattr_t attr;

  ret = pthread_mutexattr_init(&attr);
  if (0 != ret) {
    fail("pthread_mutexattr_init()", ret);
  }

  ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  if (0 != ret) {
    fail("pthread_mutexattr_settype()", ret);
  }

  ret = pthread_mutex_init(&cond->lock, &attr);
  if (0 != ret) {
    fail("pthread_mutex_init()", ret);
  }

  ret = pthread_mutexattr_destroy(&attr);
  if (0 != ret) {
    fail("pthread_mutexattr_destroy()", ret);
  }

  ret = pthread_cond_init(&cond->cond, 0);
  if (0 != ret) {
    fail("pthread_cond_init()", ret);
  }

  cond->ccount = 0lu;
  cond->wcount = 0lu;
}


void
xdestroy(struct cond *cond)
{
  int ret;

  ret = pthread_cond_destroy(&cond->cond);
  if (0 != ret) {
    fail("pthread_cond_destroy()", ret);
  }

  ret = pthread_mutex_destroy(&cond->lock);
  if (0 != ret) {
    fail("pthread_mutex_destroy()", ret);
  }
}


void
xlock(struct cond *cond)
{
  int ret;

  ret = pthread_mutex_lock(&cond->lock);
  if (0 != ret) {
    fail("pthread_mutex_lock()", ret);
  }
}


void
xlock_pred(struct cond *cond)
{
  xlock(cond);
  ++cond->ccount;
}


void
xunlock(struct cond *cond)
{
  int ret;

  ret = pthread_mutex_unlock(&cond->lock);
  if (0 != ret) {
    fail("pthread_mutex_unlock()", ret);
  }
}


void
xwait(struct cond *cond)
{
  int ret;

  ++cond->wcount;
  ret = pthread_cond_wait(&cond->cond, &cond->lock);
  if (0 != ret) {
    fail("pthread_cond_wait()", ret);
  }
  ++cond->ccount;
}


void
xsignal(struct cond *cond)
{
  int ret;

  ret = pthread_cond_signal(&cond->cond);
  if (0 != ret) {
    fail("pthread_cond_signal()", ret);
  }
}


void
xbroadcast(struct cond *cond)
{
  int ret;

  ret = pthread_cond_broadcast(&cond->cond);
  if (0 != ret) {
    fail("pthread_cond_broadcast()", ret);
  }
}


void
xcreate(pthread_t *thread, void *(*routine)(void *), void *arg)
{
  int ret;

  ret = pthread_create(thread, 0, routine, arg);
  if (0 != ret) {
    fail("pthread_create()", ret);
  }
}


void
xjoin(pthread_t thread)
{
  int ret;

  ret = pthread_join(thread, 0);
  if (0 != ret) {
    fail("pthread_join()", ret);
  }
}


void
xraise(int sig)
{
  if (-1 == kill(pid, sig)) {
    fail("kill()", errno);
  }
}


/* Private stuff part 2. */
/* Name of environtment variable that sets allocation tracing. */
static const char ev_trace[] = "LBZIP2_TRACE_ALLOC";

enum outmode
{
  OM_STDOUT,  /* Write all output to stdout, -c. */
  OM_DISCARD, /* Discard output, -t. */
  OM_REGF     /* Write output to files; neither of -t/-c present. */
};

struct opts
{
  unsigned num_worker;  /* Start this many worker threads, -n. */
  enum outmode outmode; /* How to store output, -c/-t. */
  int decompress,       /* Run in bunzip2 mode, -d/-z. */
      bs100k,           /* Blocksize switch for compression, -1 .. -9. */
      verbose,          /* Print a message each time when starting a muxer. */
      print_cctrs;      /* Print condition variable counters, -S. */
};

static const char version[] = "0.17";

/* Backlog factor for all workers together. */
static const unsigned blf = 4u;

/* Names of other recognized environment variables. */
static const char * const ev_name[] = { "LBZIP2", "BZIP2", "BZIP" };

/* Separator characters in environment variable values. No escaping. */
static const char envsep[] = " \t";


static long
xstrtol(const char *str, const char *source, long lower, long upper)
{
  long tmp;
  char *endptr;

  errno = 0;
  tmp = strtol(str, &endptr, 10);
  if ('\0' == *str || '\0' != *endptr || 0 != errno
      || tmp < lower || tmp > upper) {
    (void)fprintf(stderr, "%s: failed to parse \"%s\" from %s as a long in"
        " [%ld..%ld], specify \"-h\" for help\n", pname, str, source, lower,
        upper);
    fatal();
  }

  return tmp;
}


static void
usage(unsigned mx_worker)
{
  (void)fprintf(stderr,
    "lbzip2: Parallel bzip2 utility.\n"
    "Copyright (C) 2008, 2009 Laszlo Ersek. Released under the GNU GPLv2+.\n"
    "Version %s.\n"
    "\n"
    "Usage:\n"
    "1. PROG [-n WORKER-THREADS] [-c|-t] [-d|-z] [-1 .. -9] [-v] [-S] {FILE}\n"
    "2. PROG -h\n"
    "\n"
    "Recognized PROG names:\n"
    "  bunzip2, lbunzip2  : Decompress. Forceable with \"-d\".\n"
    "  bzcat, lbzcat      : Decompress to stdout. Forceable with \"-cd\".\n"
    "  <otherwise>        : Compress. Forceable with \"-z\".\n"
    "\n"
    "Environment variables:\n"
    "  LBZIP2, BZIP2,\n", version);

  (void)fprintf(stderr,
    "  BZIP               : Insert arguments betwen PROG and the rest of the\n"
    "                       command line. Tokens are separated by spaces and\n"
    "                       tabs; no escaping.\n"
    "  LBZIP2_TRACE_ALLOC : If set to a non-empty value, print a memory\n"
    "                       allocation trace to stderr. Check trace with\n"
    "                       \"malloc_trace.pl\".\n"
    "\n"
    "Options:\n"
    "  -n WORKER-THREADS  : Set the number of (de)compressor threads to\n"
    "                       WORKER-THREADS. WORKER-THREADS must be in [1,\n");

  (void)fprintf(stderr,
    "                       %u]. If this option is not specified, lbzip2\n"
#ifdef _SC_NPROCESSORS_ONLN
    "                       queries the system for the number of online\n"
    "                       processors.\n"
#else
    "                       exits with an error.\n"
#endif
    "  -c, --stdout       : Write output to stdout even with FILE operands.\n"
    "                       Incompatible with \"-t\".\n"
    "  -t, --test         : Test decompression; discard output instead of\n"
    "                       writing it to files or stdout. Incompatible with\n"
    "                       \"-c\".\n",
    mx_worker);

  (void)fprintf(stderr,
    "  -d, --decompress   : Force decompression over the selection by PROG.\n"
    "  -z, --compress     : Force compression over the selection by PROG.\n"
    "  -1 .. -9           : Set the compression block size to 100K .. 900K.\n"
    "  --fast             : Alias for \"-1\".\n"
    "  --best             : Alias for \"-9\". This is the default.\n"
    "  -v, --verbose      : Log each (de)compression start to stderr.\n"
    "  -S                 : Print condition variable statistics to stderr.\n");

  (void)fprintf(stderr,
    "  -f, --force        : Accepted for compatibility, otherwise ignored;\n"
    "                       lbzip2 never overwrites files.\n"
    "  -k, --keep         : Accepted for compatibility, otherwise ignored;\n"
    "                       lbzip2 never removes input files.\n"
    "  -s, --small,\n"
    "  -q, --quiet,\n"
    "  --repetitive-fast,\n"
    "  --repetitive-best  : Accepted for compatibility, otherwise ignored.\n"
    "  -h, --help,\n"
    "  -L, --license,\n"
    "  -V, --version      : Print this help and exit successfully.\n"
    "\n"
    "Operands:\n");

  (void)fprintf(stderr,
    "  FILE               : Specify files to compress or decompress. If no\n"
    "                       FILE is given, work as a filter. FILEs with\n"
    "                       \".bz2\", \".tbz\", \".tbz2\" and \".tz2\" name\n"
    "                       suffixes will be skipped when compressing. When\n"
    "                       decompressing, \".bz2\" suffixes will be removed\n"
    "                       in output filenames; \".tbz\", \".tbz2\" and\n"
    "                       \".tz2\" suffixes will be replaced by \".tar\";\n"
  );

  (void)fprintf(stderr,
    "                       other filenames will be suffixed with \".out\".\n"
  );

  if (ferror(stderr)) {
    fatal();
  }
  _exit(EXIT_SUCCESS);
}


struct arg
{
  struct arg *next;
  const char *val;
};


static void
opts_outmode(struct opts *opts, char ch)
{
  assert('c' == ch || 't' == ch);
  if (('c' == ch ? OM_DISCARD : OM_STDOUT) == opts->outmode) {
    (void)fprintf(stderr, "%s: \"-c\" and \"-t\" are incompatible, specify"
        " \"-h\" for help\n", pname);
    fatal();
  }
  if ('c' == ch) {
    opts->outmode = OM_STDOUT;
  }
  else {
    opts->outmode = OM_DISCARD;
    opts->decompress = 1;
  }
}


static void
opts_decompress(struct opts *opts, char ch)
{
  assert('d' == ch || 'z' == ch);
  opts->decompress = ('d' == ch);
  if (OM_DISCARD == opts->outmode) {
    opts->outmode = OM_REGF;
  }
}


static void
opts_setup(struct opts *opts, struct arg **operands, size_t argc, char **argv)
{
  struct arg **link_at;
  long mx_worker;

  /*
    Create a homogeneous argument list from the recognized environment
    variables and from the command line.
  */
  *operands = 0;
  link_at = operands;
  {
    size_t ofs;

    for (ofs = 0u; ofs < sizeof ev_name / sizeof ev_name[0]; ++ofs) {
      char *ev_val;

      ev_val = getenv(ev_name[ofs]);
      if (0 != ev_val) {
        char *tok;

        for (tok = strtok(ev_val, envsep); 0 != tok; tok = strtok(0, envsep)) {
          struct arg *arg;

          arg = xalloc(sizeof *arg);
          arg->next = 0;
          arg->val = tok;
          *link_at = arg;
          link_at = &arg->next;
        }
      }
    }

    for (ofs = 1u; ofs < argc; ++ofs) {
      struct arg *arg;

      arg = xalloc(sizeof *arg);
      arg->next = 0;
      arg->val = argv[ofs];
      *link_at = arg;
      link_at = &arg->next;
    }
  }


  /* Effectuate option defaults. */
  mx_worker = sysconf(_SC_THREAD_THREADS_MAX);
  if (-1L == mx_worker) {
    mx_worker = LONG_MAX;
  }
  if (UINT_MAX < (long unsigned)mx_worker) {
    mx_worker = UINT_MAX;
  }
  if (UINT_MAX / blf < (unsigned)mx_worker) {
    mx_worker = UINT_MAX / blf;
  }
  if ((size_t)-1 / sizeof(pthread_t) < (unsigned)mx_worker) {
    mx_worker = (size_t)-1 / sizeof(pthread_t);
  }

  opts->num_worker = 0u;
  opts->outmode = OM_REGF;
  opts->decompress = 0;
  if (0 == strcmp("bunzip2", pname) || 0 == strcmp("lbunzip2", pname)) {
    opts->decompress = 1;
  }
  else {
    if (0 == strcmp("bzcat", pname) || 0 == strcmp("lbzcat", pname)) {
      opts->outmode = OM_STDOUT;
      opts->decompress = 1;
    }
  }
  opts->bs100k = 9;
  opts->verbose = 0;
  opts->print_cctrs = 0;


  /*
    Process and remove all arguments that are options or option arguments. The
    remaining arguments are the operands.
  */
  link_at = operands;
  {
    enum
    {
      AS_CONTINUE,  /* Continue argument processing. */
      AS_STOP,      /* Processing finished because of "--". */
      AS_USAGE      /* Processing finished because user asked for help. */
    } args_state;
    struct arg *arg,
        *next;

    args_state = AS_CONTINUE;
    for (arg = *operands; 0 != arg && AS_CONTINUE == args_state; arg = next) {
      const char *argscan;

      argscan = arg->val;
      if ('-' != *argscan) {
        /* This is an operand, keep it. */
        link_at = &arg->next;
        next = arg->next;
      }
      else {
        /* This argument holds options and possibly an option argument. */
        ++argscan;

        if ('-' == *argscan) {
          ++argscan;

          if ('\0' == *argscan) {
            args_state = AS_STOP;
          }
          else if (0 == strcmp("stdout", argscan)) {
            opts_outmode(opts, 'c');
          }
          else if (0 == strcmp("test", argscan)) {
            opts_outmode(opts, 't');
          }
          else if (0 == strcmp("decompress", argscan)) {
            opts_decompress(opts, 'd');
          }
          else if (0 == strcmp("compress", argscan)) {
            opts_decompress(opts, 'z');
          }
          else if (0 == strcmp("fast", argscan)) {
            opts->bs100k = 1;
          }
          else if (0 == strcmp("best", argscan)) {
            opts->bs100k = 9;
          }
          else if (0 == strcmp("verbose", argscan)) {
            opts->verbose = 1;
          }
          else if (0 == strcmp("help", argscan)
              || 0 == strcmp("license", argscan)
              || 0 == strcmp("version", argscan)) {
            args_state = AS_USAGE;
          }
          else if (0 != strcmp("force", argscan)
              && 0 != strcmp("keep", argscan)
              && 0 != strcmp("small", argscan)
              && 0 != strcmp("quiet", argscan)
              && 0 != strcmp("repetitive-fast", argscan)
              && 0 != strcmp("repetitive-best", argscan)) {
            (void)fprintf(stderr, "%s: unknown option \"%s\", specify \"-h\""
                " for help\n", pname, arg->val);
            fatal();
          }
        } /* long option */
        else {
          int cont;

          cont = 1;
          do {
            switch (*argscan) {
              case '\0': cont = 0; break;

              case 'c': case 't':
                opts_outmode(opts, *argscan);
                break;

              case 'd': case 'z':
                opts_decompress(opts, *argscan);
                break;

              case '1': case '2': case '3': case '4': case '5': case '6':
              case '7': case '8': case '9':
                opts->bs100k = *argscan - '0';
                break;

              case 'v': opts->verbose = 1; break;

              case 'S': opts->print_cctrs = 1; break;

              case 'f': case 'k': case 's': case 'q':
                break;

              case 'h': case 'L': case 'V':
                args_state = AS_USAGE;
                cont = 0;
                break;

              case 'n':
                ++argscan;

                if ('\0' == *argscan) {
                  /* Drop this argument, as it wasn't an operand. */
                  next = arg->next;
                  (*freef)(arg);
                  *link_at = next;

                  /* Move to next argument, which is an option argument. */
                  arg = next;
                  if (0 == arg) {
                    (void)fprintf(stderr, "%s: option \"-%.1s\" requires an"
                        " argument, specify \"-h\" for help\n", pname,
                        argscan - 1);
                    fatal();
                  }
                  argscan = arg->val;
                }

                opts->num_worker = xstrtol(argscan, "\"-n\"", 1L, mx_worker);
                cont = 0;
                break;

              default:
                (void)fprintf(stderr, "%s: unknown option \"-%.1s\", specify"
                    " \"-h\" for help\n", pname, argscan);
                fatal();
            } /* switch (*argscan) */

            ++argscan;
          } while (cont);
        } /* cluster of short options */

        /* This wasn't an operand, drop it. */
        next = arg->next;
        (*freef)(arg);
        *link_at = next;
      } /* argument holds options */
    } /* arguments loop */

    if (AS_USAGE == args_state) {
      for (arg = *operands; 0 != arg; arg = next) {
        next = arg->next;
        (*freef)(arg);
      }
      usage(mx_worker);
    }
  } /* process arguments */


  /* Finalize options. */
  if (OM_REGF == opts->outmode && 0 == *operands) {
    opts->outmode = OM_STDOUT;
  }

  if (opts->decompress) {
    if (0 == *operands && isatty(STDIN_FILENO)) {
      (void)fprintf(stderr, "%s: won't read compressed data from a terminal,"
          " specify \"-h\" for help\n", pname);
      fatal();
    }
  }
  else {
    if (OM_STDOUT == opts->outmode && isatty(STDOUT_FILENO)) {
      (void)fprintf(stderr, "%s: won't write compressed data to a terminal,"
          " specify \"-h\" for help\n", pname);
      fatal();
    }
  }

  if (0u == opts->num_worker) {
#ifdef _SC_NPROCESSORS_ONLN
    long num_online;

    num_online = sysconf(_SC_NPROCESSORS_ONLN);
    if (-1 == num_online) {
      (void)fprintf(stderr, "%s: number of online processors unavailable,"
          " specify \"-h\" for help\n", pname);
      fatal();
    }
    assert(1L <= num_online);
    opts->num_worker = (mx_worker < num_online) ? mx_worker : num_online;
#else
    (void)fprintf(stderr, "%s: WORKER-THREADS not set, specify \"-h\" for"
        " help\n", pname);
    fatal();
#endif
  }
}


static void *
trace_malloc(size_t size)
{
  void *ret;
  int save_errno;

  ret = malloc(size);
  if (0 == ret) {
    save_errno = errno;
  }

  if (0 > fprintf(stderr, "%lu: malloc(%lu) == %p\n", (long unsigned)pid,
      (long unsigned)size, ret)
  ) {
    fatal();
  }

  if (0 == ret) {
    errno = save_errno;
  }

  return ret;
}


static void
trace_free(void *ptr)
{
  if (0 > fprintf(stderr, "%lu: free(%p)\n", (long unsigned)pid, ptr)) {
    fatal();
  }
  free(ptr);
}


static void *
trace_bzalloc(void *ignored, int n, int m)
{
  assert(n >= 0 && m >= 0);
  if (n > 0) {
    assert((size_t)-1 / (size_t)n >= (size_t)m);
  }
  else {
    if (m > 0) {
      assert((size_t)-1 / (size_t)m >= (size_t)n);
    }
  }

  return trace_malloc((size_t)n * (size_t)m);
}


static void
trace_bzfree(void *ignored, void *ptr)
{
  trace_free(ptr);
}


static void
xsigemptyset(sigset_t *set)
{
  if (-1 == sigemptyset(set)) {
    fail("sigemptyset()", errno);
  }
}


static void
xsigaddset(sigset_t *set, int signo)
{
  if (-1 == sigaddset(set, signo)) {
    fail("sigaddset()", errno);
  }
}


static void
xsigmask(int how, const sigset_t *set, sigset_t *oset)
{
  int ret;

  ret = pthread_sigmask(how, set, oset);
  if (0 != ret) {
    fail("pthread_sigmask()", ret);
  }
}


static void
xsigaction(int sig, void (*handler)(int))
{
  struct sigaction act;

  act.sa_handler = handler;
  xsigemptyset(&act.sa_mask);
  act.sa_flags = 0;

  if (-1 == sigaction(sig, &act, 0)) {
    fail("sigaction()", errno);
  }
}


enum caught_sig
{
  CS_INT = 1,
  CS_TERM,
  CS_USR1,
  CS_USR2
};

static volatile sig_atomic_t caught_sig;


static void
sighandler(int sig)
{
  /* sig_atomic_t is nowhere required to be able to hold signal values. */
  switch (sig) {
    case SIGINT:
      caught_sig = CS_INT;
      break;

    case SIGTERM:
      caught_sig = CS_TERM;
      break;

    case SIGUSR1:
      caught_sig = CS_USR1;
      break;

    case SIGUSR2:
      caught_sig = CS_USR2;
      break;

    default:
      assert(0);
  }
}


/*
  Dual purpose:
  a) Is the current operand already compressed?
  b) What decompressed suffix corresponds to the current compressed suffix?
*/
struct suffix
{
  const char *compr;   /* Suffix of compressed file. */
  size_t compr_len;    /* Its length (not size). */
  const char *decompr; /* Suffix of decompressed file. */
  size_t decompr_len;  /* Its length (not size). */
  int chk_compr;       /* Whether "compr" is suited for purpose "a". */
};

#define SUF(c, dc, c1) { c, sizeof c - 1u, dc, sizeof dc - 1u, c1 }
static const struct suffix suffix[]= {
    SUF(".bz2",  "",     1),
    SUF(".tbz2", ".tar", 1),
    SUF(".tbz",  ".tar", 1),
    SUF(".tz2",  ".tar", 1),
    SUF("",      ".out", 0)
};
#undef SUF


/*
  If "decompr_pathname" is NULL: check if "compr_pathname" has a compressed
  suffix. If "decompr_pathname" is not NULL: allocate and format a pathname
  for storing the decompressed output -- this always returns 1.
*/
static int
suffix_xform(const char *compr_pathname, char **decompr_pathname)
{
  size_t len,
      ofs;

  len = strlen(compr_pathname);
  for (ofs = 0u; ofs < sizeof suffix / sizeof suffix[0]; ++ofs) {
    if ((suffix[ofs].chk_compr || 0 != decompr_pathname)
        && len >= suffix[ofs].compr_len) {
      size_t prefix_len;

      prefix_len = len - suffix[ofs].compr_len;
      if (0 == strcmp(compr_pathname + prefix_len, suffix[ofs].compr)) {
        if (0 != decompr_pathname) {
          if ((size_t)-1 - prefix_len < suffix[ofs].decompr_len + 1u) {
            (void)fprintf(stderr, "%s: size_t overflow in dpn_alloc\n", pname);
            fatal();
          }
          *decompr_pathname
              = xalloc(prefix_len + suffix[ofs].decompr_len + 1u);
          (void)memcpy(*decompr_pathname, compr_pathname, prefix_len);
          (void)strcpy(*decompr_pathname + prefix_len, suffix[ofs].decompr);
        }
        return 1;
      }
    }
  }
  assert(0 == decompr_pathname);
  return 0;
}


/*
  Return a file descriptor to read from; -1 if input is unavailable (skipping).
*/
static int
input_init(const struct arg *operand, int decompress)
{
  struct stat statbuf;

  if (0 == operand) {
    return STDIN_FILENO;
  }

  if (-1 == stat(operand->val, &statbuf)) {
    (void)fprintf(stderr, "%s: skipping \"%s\": stat(): %s\n", pname,
        operand->val, strerror(errno));
  }
  else {
    if (!S_ISREG(statbuf.st_mode)) {
      (void)fprintf(stderr, "%s: skipping \"%s\": not a regular file\n",
          pname, operand->val);
    }
    else {
      if (!decompress && suffix_xform(operand->val, 0)) {
        (void)fprintf(stderr, "%s: skipping \"%s\": compressed suffix\n",
            pname, operand->val);
      }
      else {
        int ret;

        ret = open(operand->val, O_RDONLY | O_NOCTTY);
        if (-1 != ret) {
          return ret;
        }
        (void)fprintf(stderr, "%s: skipping \"%s\": open(): %s\n", pname,
            operand->val, strerror(errno));
      }
    }
  }

  if (ferror(stderr)) {
    fatal();
  }
  return -1;
}


static void
input_uninit(const struct arg *operand, int infd)
{
  if (-1 == close(infd)) {
    assert(0 != operand || STDIN_FILENO == infd);
    fail("close(%s%s%s)", errno,
        0 == operand ? ""             : "\"",
        0 == operand ? "STDIN_FILENO" : operand->val,
        0 == operand ? ""             : "\""
    );
  }
}


/*
  If skipping (output unavailable), return -1. Otherwise, return 0, store the
  file descriptor to write to (might be -1 if discarding), and if we write to a
  regular file, store the dynamically allocated output pathname too.
*/
static int
output_init(const struct arg *operand, int decompress, enum outmode outmode,
    int *outfd, char **output_pathname)
{
  switch (outmode) {
    case OM_STDOUT:
      *outfd = STDOUT_FILENO;
      return 0;

    case OM_DISCARD:
      *outfd = -1;
      return 0;

    case OM_REGF:
      assert(0 != operand);

      {
        char *tmp;

        if (decompress) {
          (void)suffix_xform(operand->val, &tmp);
        }
        else {
          size_t len;

          len = strlen(operand->val);
          if ((size_t)-1 - sizeof ".bz2" < len) {
            (void)fprintf(stderr, "%s: size_t overflow in cpn_alloc\n", pname);
            fatal();
          }
          tmp = xalloc(len + sizeof ".bz2");
          (void)memcpy(tmp, operand->val, len);
          (void)strcpy(tmp + len, ".bz2");
        }

        *outfd = open(tmp, O_WRONLY | O_CREAT | O_EXCL | O_NOCTTY,
            S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);

        if (-1 == *outfd) {
          if (0 > fprintf(stderr, "%s: skipping \"%s\": open(\"%s\"):"
              " %s\n", pname, operand->val, tmp, strerror(errno))) {
            fatal();
          }
          (*freef)(tmp);
        }
        else {
          *output_pathname = tmp;
          return 0;
        }
      }
      break;

    default:
      assert(0);
  }

  return -1;
}


static void
process(const struct opts *opts, const struct arg *operand, unsigned num_slot,
    int infd, int outfd, const sigset_t *unblocked)
{
  /*
    We could wait for signals with either sigwait() or sigsuspend(). SUSv2
    states about sigwait() that its effect on signal actions is unspecified.
    SUSv3 still claims the same.

    The SUSv2 description of sigsuspend() talks about both the thread and the
    whole process being suspended until a signal arrives, although thread
    suspension seems much more likely from the wording. They note that they
    filed a clarification request for this. SUSv3 cleans this up and chooses
    thread suspension which was more logical anyway.

    I favor sigsuspend() because I need to re-raise SIGTERM and SIGINT, and
    unspecified action behavior with sigwait() seems messy.

    13-OCT-2009 lacos
  */

  union
  {
    struct lbunzip2_single_arg lbunzip2_single;
    struct lbunzip2_arg        lbunzip2;
    struct lbzip2_arg          lbzip2;
  } muxer_arg;
  pthread_t muxer;

  if (opts->verbose && 0 > fprintf(stderr, "%s: %s %s%s%s to %s%s%s\n",
      pname, opts->decompress ? "decompressing" : "compressing",

      0 != operand ? "\""          : "",
      0 != operand ? operand->val  : "stdin",
      0 != operand ? "\""          : "",

      OM_REGF == opts->outmode ? "\""   : "",
      OM_REGF == opts->outmode ? opathn : OM_DISCARD == opts->outmode
                                         ? "the bit bucket"
                                         : "stdout",
      OM_REGF == opts->outmode ? "\""   : "")) {
    fatal();
  }

  if (opts->decompress) {
    if (1u == opts->num_worker) {
      muxer_arg.lbunzip2_single.num_slot = num_slot;
      muxer_arg.lbunzip2_single.print_cctrs = opts->print_cctrs;
      muxer_arg.lbunzip2_single.infd = infd;
      muxer_arg.lbunzip2_single.outfd = outfd;
      xcreate(&muxer, lbunzip2_single_wrap, &muxer_arg.lbunzip2_single);
    }
    else {
      muxer_arg.lbunzip2.num_worker = opts->num_worker;
      muxer_arg.lbunzip2.num_slot = num_slot;
      muxer_arg.lbunzip2.print_cctrs = opts->print_cctrs;
      muxer_arg.lbunzip2.infd = infd;
      muxer_arg.lbunzip2.outfd = outfd;
      xcreate(&muxer, lbunzip2_wrap, &muxer_arg.lbunzip2);
    }
  }
  else {
    muxer_arg.lbzip2.num_worker = opts->num_worker;
    muxer_arg.lbzip2.num_slot = num_slot;
    muxer_arg.lbzip2.print_cctrs = opts->print_cctrs;
    muxer_arg.lbzip2.infd = infd;
    muxer_arg.lbzip2.outfd = outfd;
    muxer_arg.lbzip2.bs100k = opts->bs100k;
    xcreate(&muxer, lbzip2_wrap, &muxer_arg.lbzip2);
  }

  /* Unblock signals, wait for them, then block them again. */
  {
    int ret;

    ret = sigsuspend(unblocked);
    assert(-1 == ret && EINTR == errno);
  }

  switch (caught_sig) {
    case CS_INT:
    case CS_TERM:
      if (0 != opathn) {
        (void)unlink(opathn);
        (*freef)(opathn);
        opathn = 0;
      }

      {
        int sig;
        sigset_t mask;

        sig = (CS_INT == caught_sig) ? SIGINT : SIGTERM;
        /*
          We might have inherited a SIG_IGN from the parent, but that would
          make no sense here. 24-OCT-2009 lacos
        */
        xsigaction(sig, SIG_DFL);
        xraise(sig);

        xsigemptyset(&mask);
        xsigaddset(&mask, sig);
        xsigmask(SIG_UNBLOCK, &mask, 0);
      }
      /*
        We shouldn't reach this point, but if we do for some reason, fall
        through.
      */

    case CS_USR1:
      /* Error from a non-main thread via fatal(). */
      fatal();

    case CS_USR2:
      /* Muxer thread joined other sub-threads and finished successfully. */
      break;

    default:
      assert(0);
  }

  xjoin(muxer);
}


static void
sigs_mod(int block_n_catch, sigset_t *oset)
{
  void (*handler)(int);

  if (block_n_catch) {
    sigset_t mask;

    xsigemptyset(&mask);
    xsigaddset(&mask, SIGINT);
    xsigaddset(&mask, SIGTERM);
    xsigaddset(&mask, SIGUSR1);
    xsigaddset(&mask, SIGUSR2);
    xsigmask(SIG_BLOCK, &mask, oset);

    handler = sighandler;
  }
  else {
    handler = SIG_DFL;
  }

  xsigaction(SIGINT,  handler);
  xsigaction(SIGTERM, handler);
  xsigaction(SIGUSR1, handler);
  xsigaction(SIGUSR2, handler);

  if (!block_n_catch) {
    xsigmask(SIG_SETMASK, oset, 0);
  }
}


int
main(int argc, char **argv)
{
  struct opts opts;
  struct arg *operands;
  unsigned num_slot;

  main_thread = pthread_self();
  pid = getpid();
  pname = strrchr(argv[0], '/');
  pname = pname ? pname + 1 : argv[0];

  {
    const char *ev_val;

    ev_val = getenv(ev_trace);
    if (0 != ev_val && '\0' != *ev_val) {
      mallocf = trace_malloc;
      freef = trace_free;
      lbzallocf = trace_bzalloc;
      lbzfreef = trace_bzfree;
    }
    else {
      mallocf = malloc;
      freef = free;
    }
  }

  xsigaction(SIGPIPE, SIG_IGN);
  xsigaction(SIGXFSZ, SIG_IGN);

  opts_setup(&opts, &operands, argc, argv);

  assert(UINT_MAX / blf >= opts.num_worker);
  num_slot = opts.num_worker * blf;

  do {
    int infd;

    infd = input_init(operands, opts.decompress);
    if (-1 != infd) {
      sigset_t unblocked;
      int outfd;

      sigs_mod(1, &unblocked);
      if (-1 != output_init(operands, opts.decompress, opts.outmode, &outfd,
          &opathn)) {
        process(&opts, operands, num_slot, infd, outfd, &unblocked);

        if (OM_REGF == opts.outmode) {
          assert(0 != opathn);
          if (-1 == close(outfd)) {
            fail("close(\"%s\")", errno, opathn);
          }
          (*freef)(opathn);
          opathn = 0;
        }
      } /* output available or discarding */
      sigs_mod(0, &unblocked);

      input_uninit(operands, infd);
    } /* input available */

    /* Move to next operand. */
    if (0 != operands) {
      struct arg *next;

      next = operands->next;
      (*freef)(operands);
      operands = next;
    }
  } while (0 != operands);

  assert(0 == opathn);
  if (OM_STDOUT == opts.outmode && -1 == close(STDOUT_FILENO)) {
    fail("close(STDOUT_FILENO)", errno);
  }

  _exit(EXIT_SUCCESS);
}

Generated by  Doxygen 1.6.0   Back to index