/*
  To build:

  gcc -pthread thtestc.c -o thtestc -lm

  To run:

  first console/machine:
  $ ./thtestc server

  second console/machine:
  $ ./thtestc client address-of-server

  or (benchmark mode)
  $ ./thtestc both
  equivalent to running server and then client with localhost

  What this does:

  ======================================================================
  server part consists of 3 threads

  SELECT THREAD:
    watches client socket for data (using select)
    whenever select returns, reads the data (one char)
    adds client + data to main threads queue and notifies main thread

  SLEEP THREAD:
    sleeps for 4 milliseconds
    increments main threads sequence number and notifies main thread

  MAIN THREAD:
    waits for any notification

    goes through the queue
      if data on the client socket is 'U' {
        sets seen_U variable
        removes this item from the queue
      }

    if seen_U variable is set {
      selects first item from the queue with data == 'L'
      if items sequence number is less than mains {
        if items sequence number != main sequence number + 1
          complains

        sends 'L' to the client
        updates clients sequence number to match that of main
        removes this item from the queue
        clears seen_U
      }
    }

  ======================================================================
  client part is one simple thread

  loop:
      sends 'L' to server
      blocking reads from server (the value must be 'L')
      busy loops
      sends U to server
      sleeps for a while

  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  With 250 HZ kernel client loops a lot less(i.e. misses wakeups so to
  speak) than with 100 HZ one. In fact:

  $ uname -r
  2.6.17.6

  $ cat /proc/cpuinfo
  processor	: 0
  vendor_id	: AuthenticAMD
  cpu family	: 15
  model		: 43
  model name	: AMD Athlon(tm)64 X2 Dual Core Processor  3800+
  stepping	: 1
  cpu MHz		: 2000.000
  cache size	: 512 KB
  physical id	: 0
  siblings	: 2
  core id		: 0
  cpu cores	: 2
  fdiv_bug	: no
  hlt_bug		: no
  f00f_bug	: no
  coma_bug	: no
  fpu		: yes
  fpu_exception	: yes
  cpuid level	: 1
  wp		: yes
  flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt lm 3dnowext 3dnow pni lahf_lm cmp_legacy ts fid vid ttp
  bogomips	: 4024.54

  processor	: 1
  vendor_id	: AuthenticAMD
  cpu family	: 15
  model		: 43
  model name	: AMD Athlon(tm)64 X2 Dual Core Processor  3800+
  stepping	: 1
  cpu MHz		: 2000.000
  cache size	: 512 KB
  physical id	: 0
  siblings	: 2
  core id		: 1
  cpu cores	: 2
  fdiv_bug	: no
  hlt_bug		: no
  f00f_bug	: no
  coma_bug	: no
  fpu		: yes
  fpu_exception	: yes
  cpuid level	: 1
  wp		: yes
  flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt lm 3dnowext 3dnow pni lahf_lm cmp_legacy ts fid vid ttp
  bogomips	: 4018.63

  `./thtestc both' output looks something like this

  250hz
  client received 423
  server received 500

  100hz
  client received 499
  server received 500

  Pretty much the same results were obtained with the kernel compiled
  without SMP support.

  No frequency scaling.

  **********************************************************************
  Same code run on the other (a lot less powerful machine)

  $ uname -r
  2.4.30

  $ cat /proc/cpuinfo 
  processor       : 0
  vendor_id       : AuthenticAMD
  cpu family      : 6
  model           : 4
  model name      : AMD Athlon(tm) Processor
  stepping        : 4
  cpu MHz         : 1050.052
  cache size      : 256 KB
  fdiv_bug        : no
  hlt_bug         : no
  f00f_bug        : no
  coma_bug        : no
  fpu             : yes
  fpu_exception   : yes
  cpuid level     : 1
  wp              : yes
  flags           : fpu vme de tsc msr pae mce cx8 sep mtrr pge mca cmov pat pse36 mmx fxsr syscall mmxext 3dnowext 3dnow
  bogomips        : 2090.59

  client received 404
  server received 499

*/

#define _GNU_SOURCE
#define USE_NANOSLEEP

/* #define VERBOSE */

#include <err.h>
#include <time.h>
#include <math.h>
#include <netdb.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/queue.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define PORT 0xdead

#define TIMING_START for (;;) {                 \
    double sec;                                 \
    struct timeval tv1, tv2;                    \
    if (gettimeofday (&tv1, NULL) < 0)          \
        err (1, "%s:gettimeofday1", __func__);


#define TIMING_END(name, cond)                  \
    if (gettimeofday (&tv2, NULL) < 0)          \
        err (1, "%s:gettimeofday2", __func__);  \
                                                \
    sec = (tv2.tv_sec + tv2.tv_usec * 1e-6)     \
        - (tv1.tv_sec + tv1.tv_usec * 1e-6);    \
                                                \
    if (cond)                                   \
        printf (#name " took %f secs\n", sec);  \
    break;                                      \
}

struct client;

struct event {
    struct client *client;
    char value;
    TAILQ_ENTRY (event) entries;
};

struct server {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int seq;
    TAILQ_HEAD (listhead, event) head;
};

struct client {
    int sock;
    int seq;
};

struct select_arg {
    struct server *server;
    struct client *client;
    void (*notifier) (struct server *server, struct client *client, char val);
};

struct sleep_arg {
    struct server *server;
    void (*notifier) (struct server *server);
};

/* listen and accept one connection on the given port */
static struct client do_accept (int port)
{
    int sock, sock2;
    struct sockaddr_in name;
    struct sockaddr_in client_name;
    size_t client_name_size;
    int one = 1;
    struct client client;

    sock = socket (PF_INET, SOCK_STREAM, 0);
    if (sock < 0)
        err (1, "socket");

    if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)))
        err (1, "setsockopt");

    name.sin_family = AF_INET;
    name.sin_port = htons (port);
    name.sin_addr.s_addr = htonl (INADDR_ANY);
    if (bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0)
        err (1, "bind");

    if (listen (sock, 1) < 0)
        err (1, "listen");

    client_name_size = sizeof (client_name);
    sock2 = accept (sock, (struct sockaddr *) &client_name, &client_name_size);
    if (sock2 < 0)
        err (1, "accept");

    client.sock = sock2;
    client.seq = 0;
    return client;
}

/* receive one char from the socket */
static char recv_value (int sock)
{
    char c;
    int n;

    n = recv (sock, &c, 1, 0);
    if (n != 1)
        errx (1, "recv %d", n);
    return c;
}

/* write one char to socket */
static void send_value (int sock, char val)
{
    int n;

    n = send (sock, &val, 1, 0);
    if (n != 1)
        errx (1, "send %d", n);
}

/* select thread
   invoke select on clients fd, upon return from select read the value
   and call notifier */
static void *select_loop (void *arg)
{
    struct select_arg *s = arg;

    for (;;) {
        int n;
        fd_set rdset;

        FD_ZERO (&rdset);
        FD_SET (s->client->sock, &rdset);

        n = select (s->client->sock + 1, &rdset, NULL, NULL, NULL);

        if (n < 0)
            err (1, "select");

        if (n != 1)
            fprintf (stderr, "weird, select returned %d\n", n);

        if (FD_ISSET (s->client->sock, &rdset))
            s->notifier (s->server, s->client, recv_value (s->client->sock));
        else
            fprintf (stderr, "weird, no sock in rdset\n");
    }

    return NULL;
}

/* try to sleep usec microseconds */
static void do_usleep (int usec)
{
#ifdef USE_NANOSLEEP
        struct timespec req, rem;

        req.tv_sec = 0;
        req.tv_nsec = usec * 1000;

        for (;;) {
            if (nanosleep (&req, &rem) < 0) {
                if (errno == EINTR) {
                    fprintf (stderr, "interrupted by signal\n");
                    req = rem;
                    continue;
                }
                else
                    err (1, "do_usleep:nanosleep");
            }
            break;
        }
#else
        struct timeval tv;

        tv.tv_sec = 0;
        tv.tv_usec = usec;

        if (select (0, NULL, NULL, NULL, &tv))
            err (1, "do_usleep:select");
#endif
}

/* sleep thread: call notifier 25 times a second */
static void *sleep_loop (void *arg)
{
    struct sleep_arg *s = arg;

    for (;;) {
        do_usleep (40000);
        s->notifier (s->server);
    }

    return NULL;
}

/* spawn a thread */
static void spawn (void *(*f) (void *), void *arg)
{
    pthread_t th;

    if ((errno = pthread_create (&th, NULL, f, arg)))
        err (1, "pthread_create");
}

/* notify main thread that the data(value) is available on client */
static void select_notifier (struct server *server, struct client *client,
                             char value)
{
    struct event *event;

    event = calloc (sizeof (*event), 1);
    if (!event)
        err (1, "calloc");

    if ((errno = pthread_mutex_lock (&server->mutex)))
        err (1, "select:pthread_mutex_lock");

    event->client = client;
    event->value = value;
    TAILQ_INSERT_TAIL (&server->head, event, entries);

    if ((errno = pthread_mutex_unlock (&server->mutex)))
        err (1, "select:pthread_mutex_unlock");

    if ((errno = pthread_cond_signal (&server->cond)))
        err (1, "select:pthread_cond_signal");
}

/* notify main thread that 0.04 secs had elapsed */
static void sleep_notifier (struct server *server)
{
    if ((errno = pthread_mutex_lock (&server->mutex)))
        err (1, "sleep:pthread_mutex_lock");

    server->seq += 1;

    if ((errno = pthread_mutex_unlock (&server->mutex)))
        err (1, "sleep:pthread_mutex_unlock");

    if ((errno = pthread_cond_signal (&server->cond)))
        err (1, "sleep:pthread_cond_signal");
}

/* main server thread */
static int global_server_received;
static void server_atexit (void)
{
    printf ("server received %d\n", global_server_received);
}

static void *server (void *arg)
{
    int seen_U = 1;
    int old_seq;
    struct server server;
    struct client client;
    struct select_arg select_arg;
    struct sleep_arg sleep_arg;

    atexit (server_atexit);

    server.seq = 0;
    TAILQ_INIT (&server.head);

    if (pthread_mutex_init (&server.mutex, NULL))
        err (1, "pthread_mutex_init");

    if (pthread_cond_init (&server.cond, NULL))
        err (1, "pthread_cond_init");

    client = do_accept (PORT);

    select_arg.server = &server;
    select_arg.client = &client;
    select_arg.notifier = select_notifier;

    sleep_arg.server = &server;
    sleep_arg.notifier = sleep_notifier;

    spawn (select_loop, &select_arg);
    spawn (sleep_loop, &sleep_arg);

    for (;;) {
        struct event *event, *temp_event;

        old_seq = server.seq;
        if ((errno = pthread_cond_wait (&server.cond, &server.mutex)))
            err (1, "pthread_cond_wait");

        if (old_seq != server.seq) {
            global_server_received += 1;
            if (arg)
                if (global_server_received == 500)
                    exit (0);
        }

#ifdef VERBOSE
        fprintf (stderr, "old=%d new=%d client=%d seen_U=%d\n",
                 old_seq, server.seq, client.seq, seen_U);
#endif

        event = server.head.tqh_first;
        while (event) {
            temp_event = event->entries.tqe_next;

            if (event->value == 'U') {
                seen_U = 1;
                TAILQ_REMOVE (&server.head, event, entries);
                free (event);
            }

            event = temp_event;
        }

        if (seen_U) {
            event = server.head.tqh_first;
            while (event) {
                if (event->value == 'L') {
                    break;
                }
                event = event->entries.tqe_next;
            }

            if (event) {
                if (event->client->seq < server.seq) {
                    if (event->client->seq + 1 != server.seq) {
                        fprintf (stderr, "server %d client %d\n",
                                 server.seq, event->client->seq);
                    }
                    send_value (event->client->sock, 'L');
                    event->client->seq = server.seq;
                    seen_U = 0;

                    TAILQ_REMOVE (&server.head, event, entries);
                    free (event);
                }
            }
        }
    }
}

/* client thread */
static int global_client_received;
static void client_atexit (void)
{
    printf ("client received %d\n", global_client_received);
}

static void handle_signal (int signum)
{
    (void) signum;
    exit (0);
}

static void *client (void *vhostname)
{
    int sock;
    const char *hostname = vhostname;
    struct hostent *hostinfo;
    struct sockaddr_in name;
    int sum = 0;
    int run;

    atexit (client_atexit);
    signal (SIGINT, handle_signal);
    signal (SIGQUIT, handle_signal);

    name.sin_family = AF_INET;
    name.sin_port = htons (PORT);
    hostinfo = gethostbyname (hostname);
    if (!hostinfo)
        err (1, "gethostbyname");
    name.sin_addr = *(struct in_addr *) hostinfo->h_addr;

    sock = socket (PF_INET, SOCK_STREAM, 0);
    if (sock < 0)
        err (1, "socket");

    if (connect (sock, (struct sockaddr *) &name, sizeof (name)) < 0)
        err (1, "connect");

    for (run = 0; ; ++run) {
        char c;
        int i;

#define COND !(run % 250)

        TIMING_START {
            TIMING_START {
                TIMING_START
                    send_value (sock, 'L');
                TIMING_END (send, COND);

                TIMING_START
                    c = recv_value (sock);
                TIMING_END (recv, COND);
                assert (c == 'L');
            } TIMING_END (trip, COND);

            global_client_received += 1;

            TIMING_START
                for (i = 0; i < 8192 * 8; ++i)
                    sum += (int) tan (sin (cos ((double) i)));
            TIMING_END (busy, COND);

            TIMING_START {
                TIMING_START
                    send_value (sock, 'U');
                TIMING_END (send2, COND);

                TIMING_START
                    do_usleep (30000);
                TIMING_END (sleep, COND);
            } TIMING_END (last, COND);
        } TIMING_END (whole, COND);

        if (COND)
            puts ("");
    }
    return (void *) sum;
}

int main (int argc, char **argv)
{
    if (argc == 2 && !strcmp (argv[1], "server"))
        server (0);
    else if (argc == 3 && !strcmp (argv[1], "client"))
        client (argv[2]);
    else if (argc == 2 && !strcmp (argv[1], "both")) {
        spawn (server, (void *) -1);
        spawn (client, "localhost");
        sleep (25);
    }
    else
        errx (1, "command line must be: "
              "[server], [client server-address] or [both]");

    return 0;
}
