/*
  To build:

  gcc -std=c99 -pthread thtestc.c -o thtestc
  gcc -DNO_SELECT -std=c99 -pthread thtestc.c -o thtestc-bit-faster
  gcc -DNO_SOCKETS -std=c99 -pthread thtestc.c -o thtestc-alot-faster
  gcc -DSPEEDY_RACER -std=c99 -pthread thtestc.c -o thtestc-speedy-racer

  To run:

  $ ./thtestc both

  What this does:

  ======================================================================
  server part consists of 3 threads

  SELECT THREAD:
    watches client socket for data (using select)
    whenever select returns, reads the data (a char + timeval)
    adds client + data to main threads queue and notifies main thread

  SLEEP THREAD:
    sleeps for 4 milliseconds
    increments main threads sequence number and notifies main thread

  MAIN THREAD:
    waits for any notification

    goes through the queue
      if data on the client socket is 'U' {
        sets seen_U variable
        removes this item from the queue
      }

    if seen_U variable is set {
      selects first item from the queue with data == 'L'
      if items sequence number is less than mains {
        if items sequence number != main sequence number + 1
          complains

        sends 'L' to the client
        updates clients sequence number to match that of main
        removes this item from the queue
        clears seen_U
      }
    }

  ======================================================================
  client part is one simple thread

  loop:
      sends 'L' to server
      blocking reads from server (the value must be 'L')
      busy loops
      sends U to server
      sleeps for a while


  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  NO_SELECT uses blocking read without selecting in select_loop
  NO_SOCKETS uses ad-hoc pthread_cond based communication
  SPEEDY_RACER does something described bellow

  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  With a simple refinement however this code runs just as expected (i.e.
  no aritifical delays)

  server upon receiving 'U' from the client responds with an 'U'
  client after sending 'U' reads an 'U' from the server

  this is redundant, however affects the performance tremendously

  situation on 250HZ kernels(linux) is much worse than on 100HZ, some
  numbers obtained on local machines:

  legend:
  1 - 2.4.30 (100HZ) Athlon(tm) Processor 1050.052MHz
  2 - 2.6.17.6 (250HZ) Athlon(tm)64 X2 Dual Core Processor  3800+ 2000.000 MHz

  time elapsed between client sending an 'L' to the server and the server
  actually seeing it (after select/recv)

  original
  1 - 0.010
  2 - 0.016

  speedy-racer
  1 - 0.000030 - 0.000070
  2 - 0.000011

  http://groups.google.com/group/comp.unix.programmer/tree/browse_frm/thread/5394d00781547266/17c8a349edae9d3c?rnum=1&q=malc&_done=%2Fgroup%2Fcomp.unix.programmer%2Fbrowse_frm%2Fthread%2F5394d00781547266%2F17c8a349edae9d3c%3Ftvc%3D1%26q%3Dmalc%26#doc_17c8a349edae9d3c
*/

#define _GNU_SOURCE
#define USE_NANOSLEEP

/* #define VERBOSE */

#define dolog2(...) fprintf (stderr, __VA_ARGS__)

#define SHOW_TRIP_TIME 1
#ifdef VERBOSE
#define dolog(...) dolog2
#else
#define dolog(...)
#endif

#include <err.h>
#include <time.h>
#include <math.h>
#include <netdb.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/queue.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define PORT 0xdead

#define TIMING_START for (;;) {                 \
    double sec_;                                \
    struct timeval tv1_, tv2_;                  \
    if (gettimeofday (&tv1_, NULL) < 0)         \
        err (1, "%s:gettimeofday1", __func__);


#define TIMING_END(name, cond)                          \
     if (cond) {                                        \
         if (gettimeofday (&tv2_, NULL) < 0)            \
             err (1, "%s:gettimeofday2", __func__);     \
                                                        \
         sec_ = (tv2_.tv_sec + tv2_.tv_usec * 1e-6)     \
             - (tv1_.tv_sec + tv1_.tv_usec * 1e-6);     \
                                                        \
         printf (#name " took %f secs\n", sec_);        \
     }                                                  \
     break;                                             \
}

struct client;

struct data {
    char c;
    struct timeval tv;
};

struct event {
    struct client *client;
    struct data data;
    TAILQ_ENTRY (event) entries;
};

struct server {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int seq;
    TAILQ_HEAD (listhead, event) head;
};

struct client {
    int sock;
    int seq;
};

struct select_arg {
    struct server *server;
    struct client *client;
    void (*notifier) (struct server *server, struct client *client,
                      struct data *data);
};

struct sleep_arg {
    struct server *server;
    void (*notifier) (struct server *server);
};

#ifdef NO_SOCKETS
#define Q_LEN 2

static pthread_cond_t q_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t q_mutex = PTHREAD_MUTEX_INITIALIZER;

static struct q {
    size_t rpos;
    size_t wpos;
    size_t filled;
    struct data buf[Q_LEN];
} global_q[2];

static void put (int qn, struct data *data)
{
    struct q *q = &global_q[!qn];

    if ((errno = pthread_mutex_lock (&q_mutex)))
        err (1, "put:pthread_mutex_lock");

    while (q->filled == Q_LEN)
        if ((errno = pthread_cond_wait (&q_cond, &q_mutex)))
            err (1, "put:pthread_cond_wait");

    q->buf[q->wpos] = *data;
    q->wpos = (q->wpos + 1) % Q_LEN;
    q->filled += 1;

    if ((errno = pthread_cond_broadcast (&q_cond)))
        err (1, "put:pthread_cond_broadcast");

    if ((errno = pthread_mutex_unlock (&q_mutex)))
        err (1, "put:pthread_mutex_unlock");
}

static struct data get (int qn)
{
    struct data data;
    struct q *q = &global_q[qn];

    if ((errno = pthread_mutex_lock (&q_mutex)))
        err (1, "get:pthread_mutex_lock");

    while (!q->filled)
        if ((errno = pthread_cond_wait (&q_cond, &q_mutex)))
            err (1, "get:pthread_cond_wait");

    data = q->buf[q->rpos];
    q->rpos = (q->rpos + 1) % Q_LEN;
    q->filled -= 1;

    if (SHOW_TRIP_TIME) {
        double sec;
        struct timeval tv1;
        struct timeval tv2;

        if (gettimeofday (&tv2, NULL) < 0)
            err (1, "get:gettimeofday");

        tv1 = data.tv;
        sec = (tv2.tv_sec + tv2.tv_usec * 1e-6)
            - (tv1.tv_sec + tv1.tv_usec * 1e-6);
        dolog2 ("| %c %d in flight %f secs\n", data.c, qn, sec);
    }

    if ((errno = pthread_cond_broadcast (&q_cond)))
        err (1, "get:pthread_cond_broadcast");

    if ((errno = pthread_mutex_unlock (&q_mutex)))
        err (1, "get:pthread_mutex_unlock");

    return data;
}
#endif

/* listen and accept one connection on the given port */
static struct client do_accept (int port)
{
#ifdef NO_SOCKETS
    struct client client;

    (void) port;
    client.sock = 0;
    client.seq = 0;
    return client;
#else
    int sock, sock2;
    struct sockaddr_in name;
    struct sockaddr_in client_name;
    size_t client_name_size;
    int one = 1;
    struct client client;

    sock = socket (PF_INET, SOCK_STREAM, 0);
    if (sock < 0)
        err (1, "socket");

    if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)))
        err (1, "setsockopt");

    name.sin_family = AF_INET;
    name.sin_port = htons (port);
    name.sin_addr.s_addr = htonl (INADDR_ANY);
    if (bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0)
        err (1, "bind");

    if (listen (sock, 1) < 0)
        err (1, "listen");

    client_name_size = sizeof (client_name);
    sock2 = accept (sock, (struct sockaddr *) &client_name, &client_name_size);
    if (sock2 < 0)
        err (1, "accept");

    client.sock = sock2;
    client.seq = 0;
    return client;
#endif
}

/* receive one char from the socket */
static struct data recv_value (int sock)
{
    struct data data;
#ifdef NO_SOCKETS
    data = get (sock);
#else
    int n;

    n = recv (sock, &data, sizeof (data), 0);
    if (n - sizeof (data))
        errx (1, "recv %d", n);
#endif
    if (SHOW_TRIP_TIME) {
        double sec;
        struct timeval tv1;
        struct timeval tv2;

        if (gettimeofday (&tv2, NULL) < 0)
            err (1, "get:gettimeofday");

        tv1 = data.tv;
        sec = (tv2.tv_sec + tv2.tv_usec * 1e-6)
            - (tv1.tv_sec + tv1.tv_usec * 1e-6);
        dolog2 ("%c %d in flight %f secs\n", data.c, sock, sec);
    }
    return data;
}

/* write one char to socket */
static void send_value (int sock, char c)
{
    struct data data;

    data.c = c;
    if (gettimeofday (&data.tv, NULL) < 0)
        err (1, "send_value:gettimeofday");

#ifdef NO_SOCKETS
    put (sock, &data);
#else
    int n;

    n = send (sock, &data, sizeof (data), 0);
    if (n - sizeof (data))
        errx (1, "send %d", n);
#endif
}

/* select thread
   invoke select on clients fd, upon return from select read the value
   and call notifier */
static void *select_loop (void *arg)
{
    struct select_arg *s = arg;

    for (;;) {
        struct data data;

#ifdef NO_SOCKETS
        data = recv_value (s->client->sock);
        s->notifier (s->server, s->client, &data);
#else
#ifdef NO_SELECT
        data = recv_value (s->client->sock);
        s->notifier (s->server, s->client, &data);
#else
        int n;
        fd_set rdset;

        FD_ZERO (&rdset);
        FD_SET (s->client->sock, &rdset);
        n = select (s->client->sock + 1, &rdset, NULL, NULL, NULL);

        if (n < 0)
            err (1, "select");

        if (n != 1)
            fprintf (stderr, "weird, select returned %d\n", n);

        if (FD_ISSET (s->client->sock, &rdset)) {
            data = recv_value (s->client->sock);
            s->notifier (s->server, s->client, &data);
        }
        else
            fprintf (stderr, "weird, no sock in rdset\n");
#endif
#endif
    }

    return NULL;
}

/* try to sleep usec microseconds */
static void do_usleep (int usec)
{
#ifdef USE_NANOSLEEP
        struct timespec req, rem;

        req.tv_sec = 0;
        req.tv_nsec = usec * 1000;

        for (;;) {
            if (nanosleep (&req, &rem) < 0) {
                if (errno == EINTR) {
                    fprintf (stderr, "interrupted by signal\n");
                    req = rem;
                    continue;
                }
                else
                    err (1, "do_usleep:nanosleep");
            }
            break;
        }
#else
        struct timeval tv;

        tv.tv_sec = 0;
        tv.tv_usec = usec;

        if (select (0, NULL, NULL, NULL, &tv))
            err (1, "do_usleep:select");
#endif
}

/* sleep thread: call notifier 25 times a second */
static void *sleep_loop (void *arg)
{
    struct sleep_arg *s = arg;
    struct timeval tv1, tv2;

    if (gettimeofday (&tv1, NULL))
        err (1, "sleep_loop:gettimeofday");

    for (;;) {
        double rem, sec;

        if (gettimeofday (&tv2, NULL))
            err (1, "sleep_loop:gettimeofday2");

        sec = (tv2.tv_sec + tv2.tv_usec * 1e-6)
            - (tv1.tv_sec + tv1.tv_usec * 1e-6);
        if (sec > 0.04) {
            dolog2 ("missed completely %f wakeups\n", sec / 0.04);
            tv1 = tv2;
            continue;
        }

        rem = (0.04 - sec) * 1e6;

        do_usleep (rem);
        s->notifier (s->server);

        struct timeval tv3;
        if (gettimeofday (&tv3, NULL))
            err (1, "sleep_loop:gettimeofday2");

#if 0
        double sec2 = (tv3.tv_sec + tv3.tv_usec * 1e-6)
            - (tv2.tv_sec + tv2.tv_usec * 1e-6);
        dolog2 ("usleep took %f wanted %f\n", sec2, 0.04 - sec);
#endif
        tv1.tv_usec += 40000;
        while (tv1.tv_usec > 999999) {
            tv1.tv_sec += 1;
            tv1.tv_usec -= 1000000;
        }
        /* dolog2 ("overhead sec=%f\n", sec); */
    }

    return NULL;
}

/* spawn a thread */
static void spawn (void *(*f) (void *), void *arg)
{
    pthread_t th;

    if ((errno = pthread_create (&th, NULL, f, arg)))
        err (1, "pthread_create");
}

/* notify main thread that the data(value) is available on client */
static void select_notifier (struct server *server, struct client *client,
                             struct data *data)
{
    struct event *event;

    event = calloc (sizeof (*event), 1);
    if (!event)
        err (1, "calloc");

    if ((errno = pthread_mutex_lock (&server->mutex)))
        err (1, "select:pthread_mutex_lock");

    event->client = client;
    event->data = *data;
    TAILQ_INSERT_TAIL (&server->head, event, entries);

    if ((errno = pthread_cond_signal (&server->cond)))
        err (1, "select:pthread_cond_signal");

    if ((errno = pthread_mutex_unlock (&server->mutex)))
        err (1, "select:pthread_mutex_unlock");
}

/* notify main thread that 0.04 secs had elapsed */
static void sleep_notifier (struct server *server)
{
    if ((errno = pthread_mutex_lock (&server->mutex)))
        err (1, "sleep:pthread_mutex_lock");

    server->seq += 1;

    if ((errno = pthread_cond_signal (&server->cond)))
        err (1, "sleep:pthread_cond_signal");

    if ((errno = pthread_mutex_unlock (&server->mutex)))
        err (1, "sleep:pthread_mutex_unlock");
}

/* main server thread */
static int global_server_received;
static void server_atexit (void)
{
    printf ("server received %d\n", global_server_received);
}

static void *server (void *arg)
{
    int seen_U = 1;
    int old_seq;
    struct server server;
    struct client client;
    struct select_arg select_arg;
    struct sleep_arg sleep_arg;

    atexit (server_atexit);

    server.seq = 0;
    TAILQ_INIT (&server.head);

    if ((errno = pthread_mutex_init (&server.mutex, NULL)))
        err (1, "pthread_mutex_init");

    if ((errno = pthread_cond_init (&server.cond, NULL)))
        err (1, "pthread_cond_init");

    if ((errno = pthread_mutex_lock (&server.mutex)))
        err (1, "pthread_mutex_lock");

    client = do_accept (PORT);

    select_arg.server = &server;
    select_arg.client = &client;
    select_arg.notifier = select_notifier;

    sleep_arg.server = &server;
    sleep_arg.notifier = sleep_notifier;

    spawn (select_loop, &select_arg);
    spawn (sleep_loop, &sleep_arg);

    for (;;) {
        struct event *event, *temp_event;

        old_seq = server.seq;
        if ((errno = pthread_cond_wait (&server.cond, &server.mutex)))
            err (1, "pthread_cond_wait");

        if (old_seq != server.seq) {
            global_server_received += 1;
            if (arg)
                if (global_server_received == 500)
                    exit (0);
        }

        dolog ("old=%d new=%d client=%d seen_U=%d\n",
               old_seq, server.seq, client.seq, seen_U);

        event = server.head.tqh_first;
        while (event) {
            temp_event = event->entries.tqe_next;

            if (event->data.c == 'U') {
                if (!seen_U) {
                    seen_U = 1;
                }
#ifdef SPEEDY_RACER
                send_value (event->client->sock, 'U');
#endif
                TAILQ_REMOVE (&server.head, event, entries);
                free (event);
            }

            event = temp_event;
        }

        if (seen_U) {
            event = server.head.tqh_first;
            while (event) {
                if (event->data.c == 'L') {
                    break;
                }
                event = event->entries.tqe_next;
            }

            if (event) {
                if (event->client->seq < server.seq) {
                    if (event->client->seq + 1 != server.seq) {
                        fprintf (stderr, "server %d client %d\n",
                                 server.seq, event->client->seq);
                    }

                    send_value (event->client->sock, 'L');
                    event->client->seq = server.seq;
                    seen_U = 0;

                    TAILQ_REMOVE (&server.head, event, entries);
                    free (event);
                }
            }
        }
        else {
            event = server.head.tqh_first;
            while (event) {
                if (event->data.c == 'L') {
                    fprintf (stderr, "whoops client has L but not U\n");
                    exit (0);
                }
                event = event->entries.tqe_next;
            }
        }
    }
}

/* client thread */
static int global_client_received;
static void client_atexit (void)
{
    printf ("client received %d\n", global_client_received);
}

static void handle_signal (int signum)
{
    (void) signum;
    exit (0);
}

static void busy_loop (void)
{
    struct timeval tv1, tv2;
    double sec;

    if (gettimeofday (&tv1, NULL))
        err (1, "client_loop:gettimeofday1");

    for (;;) {
        if (gettimeofday (&tv2, NULL))
            err (1, "client_loop:gettimeofday2");
        sec = (tv2.tv_sec + tv2.tv_usec * 1e-6)
            - (tv1.tv_sec + tv1.tv_usec * 1e-6);
        if (sec > 0.009)
            return;
    }
}

static void __attribute__ ((unused))
    uncluttered_client_loop (int sock)
{
    int run;

    for (run = 0; ; ++run) {
        struct data data;

        send_value (sock, 'L');
        data = recv_value (sock);
        assert (data.c == 'L');
        global_client_received += 1;
        busy_loop ();
        send_value (sock, 'U');

#ifdef SPEEDY_RACER
        data = recv_value (sock);
        assert (data.c == 'U');
#endif
        do_usleep (20000);
    }
}

static void client_loop (int sock)
{
    int run;

    for (run = 0; ; ++run) {
        struct data data;
#define COND !(run % 50)

        TIMING_START {
            TIMING_START {
                TIMING_START
                    send_value (sock, 'L');
                TIMING_END (send, COND);

                TIMING_START
                    data = recv_value (sock);
                TIMING_END (recv, COND);
                assert (data.c == 'L');
            } TIMING_END (trip, COND);

            TIMING_START {
                global_client_received += 1;

                TIMING_START
                    busy_loop ();
                TIMING_END (busy, COND);

                TIMING_START {
                    TIMING_START {
                        TIMING_START
                            send_value (sock, 'U');
                        TIMING_END (send2, COND);

#ifdef SPEEDY_RACER
                        TIMING_START
                            data = recv_value (sock);
                        TIMING_END (recv2, COND);
                        assert (data.c == 'U');
#endif
                    } TIMING_END (trip2, COND);

                    TIMING_START
                        do_usleep (20000);
                    TIMING_END (sleep, COND);
                } TIMING_END (last, COND);
            } TIMING_END (work, COND);
        } TIMING_END (whole, COND);

        if (COND)
            puts ("");
    }
}

static void *client (void *vhostname)
{
    int sock;
#ifndef NO_SOCKETS
    const char *hostname = vhostname;
    struct hostent *hostinfo;
    struct sockaddr_in name;
#endif

    atexit (client_atexit);
    signal (SIGINT, handle_signal);
    signal (SIGQUIT, handle_signal);

#ifdef NO_SOCKETS
    (void) vhostname;
    sock = 1;
#else
    name.sin_family = AF_INET;
    name.sin_port = htons (PORT);
    hostinfo = gethostbyname (hostname);
    if (!hostinfo)
        err (1, "gethostbyname");
    name.sin_addr = *(struct in_addr *) hostinfo->h_addr;

    sock = socket (PF_INET, SOCK_STREAM, 0);
    if (sock < 0)
        err (1, "socket");

    if (connect (sock, (struct sockaddr *) &name, sizeof (name)) < 0)
        err (1, "connect");
    dolog2 ("client sock = %d\n", sock);
#endif

    client_loop (sock);
    return NULL;
}

int main (int argc, char **argv)
{
    if (argc == 2 && !strcmp (argv[1], "both")) {
        spawn (server, (void *) -1);
        spawn (client, "localhost");
        sleep (30);
    }
#ifdef NO_SOCKETS
    else
        errx (1, "command line must be: [both]");
#else
    else  if (argc == 2 && !strcmp (argv[1], "server"))
        server (0);
    else if (argc == 3 && !strcmp (argv[1], "client"))
        client (argv[2]);
    else
        errx (1, "command line must be: "
              "[server], [client server-address] or [both]");
#endif

    return 0;
}
