#include <cmath>
#include <errno.h>

#include <boost/thread/xtime.hpp>

#include <vos/vutil/log.hh>
#include <vos/vutil/timer.hh>
#include <vos/vutil/getaddrinfowrapper.hh>

#include <vos/vip/connection.hh>
#include <vos/vip/lowlatencyproto.hh>
#include <vos/vip/stdprotocol.hh>
#include <vos/vip/socketmultiplexer.hh>

/*
  Sequence of events involved in establishing a connection:

  - Call sendSYN().  This creates a SYN packet containing *all* the
  starting sequence numbers; puts the connection in WaitingForReply
  state; keeps sending the SYN every four seconds until the state
  changes or it times out (after 8 tries).

  - The other side gets the SYN packet and call setupNewConnection()
  in the socket thread

  - The socket thread creates a new connection structure (which starts
  out in the Opening state) and calls handleSYN()

  - Because it is in the Opening state, the connection calls
  setupWithSYN().

  - setupWithSYN does the setupWithSYN for each protocol

  - Then it creates a SYN reply and sends it back

  - The client now gets the SYN and passes it to handleSYN

  - The client is in the WaitingForReply state, so it doesn't call
  setupWithSYN().  It calls replySYN to check that the reply matches
  up with what we sent.  Then the state goes to connected and we wake up.
*/

/* Teardown:

   - We get a "VIP_RST" control from the other side.  In this case,
   the connection is shut down immediately.

   - If the application calls close(), there are a couple of options.
   There is the "linger" option in which case the connection will
   actually persist until all currently queued data has been sent
   (without lingering, the connection will be shut down immediately
   with no regard to queued data).  Also one can have close() block
   until the connection is actually closed (or have it happen in the
   background).
 */

/* Ping algorithm:

  - If we haven't sent anything for VIP_CONNECTION_TIMEOUT/2 seconds,
  send a ping (an empty data packet with control VIP_PING)

  - The other side sees the VIP_PING packet and replies with an empty
  VIP_DATA packet

  - Keep trying every VIP_PING_WAIT_TIME seconds until we receive any
  packet from the other side

  - if we still haven't received anything for VIP_CONNECTION_TIMEOUT
  seconds, disconnect
*/


using namespace VUtil;
using namespace VIP;

#define CANSEND(_SRTT, _dwt, _biggestFlight, _RTTVAR, _bestPing, _flightSize) \
    (_SRTT == 0 ? VIP_MAXPKTSIZE : ((((_dwt * _biggestFlight) / (_SRTT+_RTTVAR*2-_bestPing)) - _flightSize)))

#define MINCANSEND (VIP_MAXPKTSIZE/4)

#ifndef HAVE_FMAX
# define fmax(_x, _y) ((_x) > (_y) ? (_x) : (_y))
#endif

Message::Message()
    : ToS(VIP_STDPROTO), channel(0), msgid(0), completedCallback(0)
{
}

// Eventually don't want to have a thread for every connection, but do
// it this way right now because it's faster to develop
struct SenderThread
{
    Connection* c;
    SenderThread() { }
    SenderThread(const SenderThread& st) : c(st.c) { }
    void operator()()
        { c->acquire(); c->senderLoop(); c->release(); }
};

Connection::Connection(int s,
                       sockaddr_in* r,
                       SocketMultiplexer* sm)
    : skt(s), state(Opening),
      callback(0), disconCB(0), needSender(false), pktCounter(0),
      lastRecvPkt(0),
      SRTT(0), RTTVAR(0), RTO(3), bestPing(100000), cwnd(VIP_MAXPKTSIZE * 2),
      bytesSent(0), flightSize(0), biggestFlight(VIP_MAXPKTSIZE * 2),
      ssthresh(8 * VIP_MAXPKTSIZE),
      inLossRecovery(false), lastAckTime(0), LLPactive(true), starvationClock(0),
      lastSendTime(getTimer()), lastRecvTime(getTimer()),
      nextPing(VIP_CONNECTION_TIMEOUT/2), lastLLP(0), noRST(false), sktmpx(sm)
{
    memcpy(&remoteaddr, r, sizeof(sockaddr_in));

    for(int i = 0; i < VIP_PROTOSLOTS; i++) protos[i] = 0;

    protos[VIP_LOWLATENCY] = new LowLatencyProto(this);
    protos[VIP_STDPROTO] = new StandardProtocol(this);

    SenderThread st;
    st.c = this;
    boost::thread t(st);
}

Connection::~Connection()
{
    LOG("VIP", 3, this << ": deleting connection " << this);
}

void Connection::splitIntoChunks(uint8_t* buf, unsigned int sz)
{
    if(state != Connected && state != Lingering) return;

    lastRecvTime = getTimer();

    uint8_t control;
    uint32_t recvPktCounter, ackPktCounter;
    unsigned int ptr = structunpack(buf, sz, "cll",
                                    &control,
                                    &recvPktCounter,
                                    &ackPktCounter);

    LOG("VIP", 4, this << ": Got " << sz << " bytes receiving pktcounter " << recvPktCounter
        << " last acked pkt " << ackPktCounter);

    if(totalBytesSent() > 0) ackBytes(9, false);

    {
        boost::mutex::scoped_lock lk(sendTimes_mutex);

        if(recvPktCounter > lastRecvPkt) lastRecvPkt = recvPktCounter;

        unsigned int i;
        for(i = 0; i < sendTimes.size(); i++) {
            LOG("VIP", 6, this << ": st is " << sendTimes[i].pkt << " apkt is " << ackPktCounter);
            if(sendTimes[i].pkt == ackPktCounter) {
                if(sendTimes[i].tm > lastSendTime) lastSendTime = sendTimes[i].tm;
                nextPing = VIP_CONNECTION_TIMEOUT/2;
                break;
            }
        }

        LOG("VIP", 6, this << ": got pkt of " << sz << " bytes");

        if(i < sendTimes.size()) {
            for(; i; i--) sendTimes.pop_front();

            LOG("VIP", 6, this << ": RTTVAR is " << SRTT);
            double R = getTimer() - sendTimes[0].tm;

            LOG("VIP", 4, this << ": ping time is " << R);

            if(R < bestPing) bestPing = R;
            if(R > bestPing) {
                double a = .01;
                bestPing = (1 - a) * bestPing + a * R;
            }

            const double G = 2.5;
            const double K = 4;

            if(SRTT == 0) {
                SRTT = R;
                RTTVAR = R/2.0;
                RTO = SRTT + fmax(G, K*RTTVAR);
            } else {
                const double alpha = .125;
                const double beta = .25;

                RTTVAR = (1 - beta) * RTTVAR + beta * fabs(SRTT - R);
                SRTT = (1 - alpha) * SRTT + alpha * R;
            }

            RTO = SRTT + fmax(G, K*RTTVAR);

            LOG("VIP", 5, this << ": new estimated RTT is " << SRTT
                << " new retransmission time out is " << RTO);

            sendTimes.pop_front();
        } else {
            LOG("VIP", 5, this << ": got pkt but id " << ackPktCounter
                << " is unknown for RTT calc");
        }
    }

    if(control == VIP_PING) {
        LOG("VIP", 4, this << " got ping packet");
        uint8_t outgoing[9];
        unsigned int s = 9;
        unsigned int fs;
        dispatchData(outgoing, s, fs, false);
    } else if(sz == 9) {
        LOG("VIP", 4, this << " got ping reply");
    }

    while(ptr < sz) {
        LOG("VIP", 5, this << ": delivering chunk of size " << (sz - ptr)
            << " to proto " << (int)buf[ptr]);
        if(buf[ptr] < VIP_PROTOSLOTS && protos[buf[ptr]]) {
            unsigned int ch = sz - ptr;
            protos[buf[ptr]]->handleChunk(buf + ptr, &ch);
            ptr += ch;
        } else break;
    }

#if 0
    {
        boost::recursive_mutex::scoped_lock lk(cwnd_mutex);
        boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);

        LOG("VIP", 4, this << ": flightSize is " << flightSize
            << ", cwnd is " << cwnd
            << ", diff is " << (cwnd - flightSize));
    }
#endif

    wakeupSender();
}

bool Connection::handleSYN(uint8_t* buf, unsigned int sz)
{
    lastRecvTime = getTimer();

    if(state == Opening || state == WaitingForReply)
        setupWithSYN(buf, sz);

    uint8_t outgoing[VIP_MAXPKTSIZE];
    unsigned int n = sizeof(outgoing);

    if(!replySYN(buf, sz, outgoing, &n)) return false;

    if(state != WaitingForReply) {
        sendto(skt, (char*)outgoing, n, 0, (struct sockaddr*)&remoteaddr,
               (socklen_t)sizeof(sockaddr_in));
    }

    state = Connected;
    wakeupForSYN();

    return true;
}



void Connection::makeSYN(uint8_t* outgoing, unsigned int* osz)
{
    unsigned int ptr = structpack(outgoing, *osz, "c", (char)VIP_SYN);

    int npro = 0;
    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i]) npro++;
    }

    ptr += structpack(outgoing + ptr, *osz - ptr,
                      "c", (char)npro);

    uint16_t dptr = ptr + (2 * npro);

    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i]) {
            LOG("VIP", 5, this << ": writing offset " << dptr << " at SYN offset " << ptr);
            ptr += structpack(outgoing + ptr, 2, "s", dptr);

            unsigned int sz = *osz - dptr;
            protos[i]->makeSYN(outgoing + dptr, &sz);

            LOG("VIP", 5, this << ": proto at ofs " << dptr << " is " << (int)outgoing[dptr]);

            dptr += sz;
        }
    }

    LOG("VIP", 5, this << ": created SYN " << dptr << " bytes long with " << npro << " " << ptr);

    *osz = dptr;
}

void Connection::getOffsetsSYN(uint8_t* incoming, unsigned int isz,
                               uint16_t offsets[VIP_PROTOSLOTS])
{
    uint8_t control, chunks;

    memset(offsets, 0, sizeof(offsets));

    unsigned int ptr = structunpack(incoming, isz, "cc", &control, &chunks);

    for(uint8_t i = 0; i < chunks; i++) {
        uint16_t ofs = 0;
        ptr += structunpack(incoming + ptr, isz - ptr, "s", &ofs);
        LOG("VIP", 5, this << ": reading offset " << ofs << " for proto " << (int)incoming[ofs]);
        if(ofs < isz) {
            if(incoming[ofs] < VIP_PROTOSLOTS) {
                offsets[incoming[ofs]] = ofs;
            }
        }
    }
}

bool Connection::replySYN(uint8_t* incoming, unsigned int isz,
                          uint8_t* outgoing, unsigned int* osz)
{
#ifdef WTF
    LOG("VIP", 1, "incoming: ");
    for(int i = 0; i < isz; i++) {
        fprintf(stderr, "%x", incoming[i]);
    }
#endif

    uint16_t offsets[VIP_PROTOSLOTS];

    getOffsetsSYN(incoming, isz, offsets);

    int ptr = structpack(outgoing, *osz, "c", (char)VIP_SYN);

    int npro = 0;
    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i]) npro++;
    }

    ptr += structpack(outgoing + ptr, *osz - ptr,
                      "c", (char)npro);

    uint16_t dptr = ptr + (2 * npro);

    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i] && offsets[i]) {
            ptr += structpack(outgoing + ptr,
                       *osz - ptr,
                       "s", dptr);

            LOG("VIP", 5, this << ": calling proto->replySYN with " << isz << " " << offsets[i]);

            unsigned int sz = *osz - dptr;
            if(!protos[i]->replySYN(incoming + offsets[i],
                                    isz - offsets[i],
                                    outgoing + dptr,
                                    &sz))
            {
                *osz = 0;
                return false;
            }
            dptr += sz;
        }
    }

    *osz = dptr;

    return true;
}

void Connection::setupWithSYN(uint8_t* incoming, unsigned int isz)
{
    uint16_t offsets[VIP_PROTOSLOTS];

    getOffsetsSYN(incoming, isz, offsets);

    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i] && offsets[i]) {
            LOG("VIP", 6, this << ": offsets[i] is " << offsets[i]);
            protos[i]->setupWithSYN(incoming + offsets[i],
                                    isz - offsets[i]);
        }
    }
#undef WTF
#ifdef WTF
    LOG("VIP", 1, "incoming: ");
    for(int i = 0; i < isz; i++) {
        fprintf(stderr, "%x", incoming[i]);
    }
#endif
}

void Connection::sendSYN()
{
    uint8_t outgoing[VIP_MAXPKTSIZE];

    unsigned int n = sizeof(outgoing);
    makeSYN(outgoing, &n);

    state = WaitingForReply;

    for(int i = 0; i < 8 && state == WaitingForReply; i++) {
        sleepForSYN(outgoing, n);
    }
}

struct WaitForConnection
{
    Connection::ConnectState* cs;
    WaitForConnection(Connection::ConnectState* b) { cs = b; }
    bool operator()() { return (*cs != Connection::WaitingForReply); }
};

void Connection::sleepForSYN(uint8_t* outgoing, unsigned int n)
{
    boost::mutex::scoped_lock lk(SYN_mutex);

    LOG("VIP", 5, this << ": Sending SYN and sleeping");

#ifdef WTF
    LOG("VIP", 1, "outgoing: ");
    for(int i = 0; i < n; i++) {
        fprintf(stderr, "%x", outgoing[i]);
    }
#endif

    if(sendto(skt, (char*)outgoing, n, 0, (sockaddr*)&remoteaddr,
              (socklen_t)sizeof(sockaddr_in)) == -1)
        {
            LOG("VIP", 5, this << ": sendto() failed " << strerror(errno));
            return;
        }

    boost::xtime xt;
    boost::xtime_get(&xt, boost::TIME_UTC);
    xt.sec += VIP_CONNECT_WAIT_TIME;

    SYN_cond.timed_wait(lk, xt, WaitForConnection(&state));

    LOG("VIP", 4, this << ": Woke up, state is " << state);
}

void Connection::wakeupForSYN()
{
    boost::mutex::scoped_lock lk(SYN_mutex);

    SYN_cond.notify_all();
}

void Connection::queueMsg(Message* m)
{
    if(! isConnected()) throw std::runtime_error("Disconnected");

    if(m->ToS < VIP_PROTOSLOTS && protos[m->ToS])
        protos[m->ToS]->queueData(m);

    wakeupSender();
}

void Connection::senderLoop()
{
    dieSenderDie = false;
    while(!dieSenderDie && state != Closed) {
        if(state == Lingering) {
            bool dataLeft = false;
            for(int i = 0; i < VIP_PROTOSLOTS; i++) {
                if(protos[i]) {
                    if(protos[i]->hasQueuedData()) {
                        dataLeft = true;
                        break;
                    }
                }
            }
            LOG("VIP", 5, this << ": data left is " << dataLeft);
            if(!dataLeft) {
                state = Closed;
                break;
            }
        }

        idle(computeWaitTime());

        LOG("VIP", 5, this << ": state is " << state);

        if(state == Closed && disconCB) disconCB->notifyDisconnected(this);
        if(state == Closed || dieSenderDie) break;

        flush();

        if((getTimer() - lastRecvTime) > VIP_CONNECTION_TIMEOUT) {
            LOG("VIP", 1, this << ": connection timed out!");
            close(false, false);
            sktmpx->removeConnection(&remoteaddr);

            if(disconCB) disconCB->notifyDisconnected(this);
        }
    }

    LOG("VIP", 4, this << ": sending thread ending");

    if(state == Closed) cleanUp();
}


double Connection::computeWaitTime()
{
    if(state == Opening || state == WaitingForReply) return VIP_CONNECT_WAIT_TIME;
    if(state == Closed) return 0;

    double waittime = 1000000000;
    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i]) {
            double t = protos[i]->desiredWaitTime();
            if(t < waittime) waittime = t;
        }
    }

    unsigned int diff;
    unsigned int curflightSize;
    {
        boost::recursive_mutex::scoped_lock lk(cwnd_mutex);
        boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);

        curflightSize = flightSize;
        diff = cwnd - curflightSize;
    }

    if(LLPactive) {
        double dwt = protos[VIP_LOWLATENCY]->desiredWaitTime();
        int canSend = (int)CANSEND(SRTT, dwt, biggestFlight, RTTVAR,
                                   bestPing, curflightSize);

        if(canSend < MINCANSEND || diff < VIP_MAXPKTSIZE) {
            if(diff < VIP_MAXPKTSIZE) {
                waittime = getHighRTT();
            } else {
                waittime = dwt;
            }
            LOG("VIP", 5, this << ": diff " << diff << " waittime " << waittime);
        }
    } else {
        if(diff < VIP_MAXPKTSIZE) waittime = getHighRTT();
    }

    LOG("VIP", 5, this << ": waittime " << waittime);

    return waittime;
}


struct NeedSenderPred
{
    bool* ns;
    NeedSenderPred(bool* b) { ns = b; }
    bool operator()() { return *ns; }
};

void Connection::idle(double waittime)
{
    if(waittime > 0)
    {
        boost::xtime xt;
        boost::xtime_get(&xt, boost::TIME_UTC);

        xt.sec += (long int)floor(waittime);
        xt.nsec += (int)((waittime - floor(waittime)) * 1000000000.0);
        if(xt.nsec > 1000000000) { xt.nsec -= 1000000000; xt.sec++; }

        LOG("VIP", 5, this << ": needSender " << needSender);

        boost::mutex::scoped_lock lk(sender_mutex);

        if(! needSender) {
            LOG("VIP", 5, this << ": sleeping for " << waittime);
            sender_cond.timed_wait(lk, xt, NeedSenderPred(&needSender));
            {
                double now = getTimer();
                boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);
                if((lastAckTime + getRTO()) < now) {
                    if(flightSize > VIP_MAXPKTSIZE) flightSize -= VIP_MAXPKTSIZE;
                    else flightSize = 0;
                    lastAckTime = now;
                }
            }

            LOG("VIP", 5, this << ": woke up with needSender " << needSender);
        }
        needSender = false;

        LOG("VIP", 5, this << ": checkpoint 3");
    }
}

void Connection::cleanUp()
{
    if(! noRST) {
        unsigned char rst = VIP_RST;

        LOG("VIP", 4, this << ": Sending RST pkt");

        sendto(skt, (char*)&rst, 1,
               0, (struct sockaddr*)&remoteaddr,
               (socklen_t)sizeof(sockaddr_in));
    }

    boost::mutex::scoped_lock lk(close_mutex);
    close_cond.notify_all();

    for(int i = 0; i < VIP_PROTOSLOTS; i++) {
        if(protos[i]) {
            delete protos[i];
            protos[i] = 0;
        }
    }

    LOG("VIP", 6, this << ": blammmo! " << getRefCount());
}

void Connection::killSender()
{
    dieSenderDie = true;
}

void Connection::wakeupSender()
{
    boost::mutex::scoped_lock lk(sender_mutex);

    needSender = true;
    sender_cond.notify_all();
}

void Connection::flush()
{
    if(state != Connected && state != Lingering) return;

    double now = getTimer();

    if(protos[VIP_LOWLATENCY]->hasQueuedData()) lastLLP = now;
    if((now - lastLLP) > VIP_LLP_SLACK) LLPactive = false;
    else LLPactive = true;

    uint8_t outgoing[VIP_MAXPKTSIZE];
    unsigned int ptr = 9;

    bool more;
    unsigned int usecwnd;
    unsigned int useflightSize;
    int canSend;
    do {
        more = false;

        {
            boost::recursive_mutex::scoped_lock lk(cwnd_mutex);
            boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);

            usecwnd = cwnd;
            useflightSize = flightSize;
        }

        bool ackOnly = false;
        for(int i = 0; i < VIP_PROTOSLOTS; i++) {
            if(protos[i]) {
                if(! gatherData(i, outgoing, ptr, usecwnd,
                                useflightSize, more,
                                canSend, ackOnly))
                {
                    break;
                }
            }
        }
        LOG("VIP", 5, this << ": ptr is " << ptr << " more is " << more);

        if(ptr > 9) dispatchData(outgoing, ptr, useflightSize, false);
        if(ptr == 9 && getTimer() > (lastSendTime+nextPing)) {
            LOG("VIP", 4, this << ": sending ping packet " << (getTimer() - lastSendTime));
            dispatchData(outgoing, ptr, useflightSize, true);
        }
    } while(more
            && (usecwnd - useflightSize) >= VIP_MAXPKTSIZE
            && canSend < MINCANSEND);

}

bool Connection::gatherData(int i, uint8_t* outgoing, unsigned int& ptr,
                            unsigned int& usecwnd, unsigned int& useflightSize,
                            bool& more, int& canSend, bool& ackOnly)
{
    if(LLPactive) {
        double dwt = protos[VIP_LOWLATENCY]->desiredWaitTime();
        canSend = (int)CANSEND(SRTT, dwt, biggestFlight, RTTVAR,
                               bestPing, flightSize);

        if(canSend > VIP_MAXPKTSIZE) canSend = VIP_MAXPKTSIZE;

        LOG("VIPcanSend", 4, this << ": canSend " << canSend << " dwt=" << dwt
            << " cwnd=" << usecwnd << " " << " bF=" << biggestFlight
            << " rtts=" << SRTT << ", " << (SRTT+RTTVAR*2) << ", " << RTTVAR*4
            << " fs=" << useflightSize << " bestPing=" << bestPing);
    } else {
        canSend = VIP_MAXPKTSIZE;
    }

    if(canSend < 0) canSend = 0;

    assert(canSend <= VIP_MAXPKTSIZE);

    if(i != VIP_LOWLATENCY) {
        if((usecwnd - useflightSize) < VIP_MAXPKTSIZE) return false;
        if(canSend < MINCANSEND) {
            canSend = MINCANSEND;

            starvationClock++;
            if(starvationClock == 16) {
                LOG("VIP", 4, this << ": starvation clock kicking in");
            } else {
                ackOnly = true;
            }
        }
    }

    unsigned int sz;
    if(i == VIP_LOWLATENCY) sz = VIP_MAXPKTSIZE - ptr;
    else sz = canSend - ptr;

    protos[i]->getNextChunk(outgoing + ptr, &sz, ackOnly);

    if(i != VIP_LOWLATENCY && !ackOnly) starvationClock = 0;

    LOG("VIP", 5, this << ": from proto " << i << " adding chunk of size " << sz);
    ptr += sz;
    if(i > VIP_LOWLATENCY) {
        //assert(ptr <= canSend);
    } else {
        assert(ptr <= VIP_MAXPKTSIZE);
    }

    if(sz > 0 && !ackOnly) {
        more = true;
        if(i == VIP_LOWLATENCY) return false;
    }

    return true;
}

void Connection::dispatchData(uint8_t* outgoing, unsigned int& ptr,
                              unsigned int& useflightSize, bool ping)
{
    boost::mutex::scoped_lock lk(sendTimes_mutex);
    boost::recursive_mutex::scoped_lock lk2(bytesSent_mutex);
    boost::recursive_mutex::scoped_lock lk3(flightSize_mutex);

    char control;

    if(ping) control = VIP_PING;
    else control = VIP_DATA;

    structpack(outgoing,
               9, "cll",
               control,
               ++pktCounter,
               lastRecvPkt);

    LOG("VIP", 6, this << ": sending with pktcounter " << pktCounter
        << " lastrcvpkt " << lastRecvPkt);

    TimeEntry te;
    te.pkt = pktCounter;
    te.tm = getTimer();
    sendTimes.push_back(te);

    if(control == VIP_DATA) {
        lastSendTime = te.tm;
        nextPing = VIP_CONNECTION_TIMEOUT/2;
    } else {
        nextPing += VIP_PING_WAIT_TIME;
    }

    bytesSent += ptr;
    flightSize += ptr;
    if(biggestFlight < flightSize) biggestFlight = flightSize;

    useflightSize = flightSize;

    LOG("VIP", 4, this << ": Sending " << ptr << " bytes (pktCounter " << te.pkt << ")");

    sendto(skt, (char*)outgoing, ptr,
           0, (struct sockaddr*)&remoteaddr,
           (socklen_t)sizeof(sockaddr_in));

    ptr = 9;
}


void Connection::ackBytes(unsigned int bytes, bool resetRecovery)
{
    boost::recursive_mutex::scoped_lock lk(cwnd_mutex);
    boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);
    boost::recursive_mutex::scoped_lock lk3(ssthresh_mutex);

    lastAckTime = getTimer();

    if(bytes > flightSize) flightSize = 0;
    else flightSize -= bytes;

    LOG("VIP", 5, this << ": " << bytes << " acked, flightSize is " << flightSize);

    if(resetRecovery) {
        if(cwnd <= ssthresh) {
            cwnd += bytes;
        } else {
            if((bytes * bytes) < cwnd) cwnd++;
        else cwnd += (bytes * bytes) / cwnd;
        }

        inLossRecovery = false;
        inFastRecovery = false;
    }
}

void Connection::notifyLoss()
{
    if(!inLossRecovery) {
        boost::recursive_mutex::scoped_lock lk1(cwnd_mutex);
        boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);
        boost::recursive_mutex::scoped_lock lk3(ssthresh_mutex);

        ssthresh = (flightSize / 2) > (VIP_MAXPKTSIZE * 2)
            ? (flightSize / 2) : (VIP_MAXPKTSIZE * 2);

        LOG("VIP", 5, this << " set ssthresh to " << ssthresh);

        cwnd = VIP_MAXPKTSIZE * 2;

        flightSize = 0;
        biggestFlight = cwnd;

        inLossRecovery = true;
    }
}

void Connection::fastRecovery()
{
    if(!inFastRecovery && !inLossRecovery) {
        boost::recursive_mutex::scoped_lock lk1(cwnd_mutex);
        boost::recursive_mutex::scoped_lock lk2(flightSize_mutex);
        boost::recursive_mutex::scoped_lock lk3(ssthresh_mutex);

        LOG("VIP", 5, this << " In fast recovery");

        ssthresh = (flightSize / 2) > (VIP_MAXPKTSIZE * 2)
            ? (flightSize / 2) : (VIP_MAXPKTSIZE * 2);

        cwnd = ssthresh + 3 * VIP_MAXPKTSIZE;

        inFastRecovery = true;
    }
}

void Connection::getPeerAddress(unsigned char ip[4], unsigned int& port)
{
    ip[0] = (remoteaddr.sin_addr.s_addr & 0xFF);
    ip[1] = ((remoteaddr.sin_addr.s_addr >> 8) & 0xFF);
    ip[2] = ((remoteaddr.sin_addr.s_addr >> 16) & 0xFF);
    ip[3] = ((remoteaddr.sin_addr.s_addr >> 24) & 0xFF);
    port = remoteaddr.sin_port;
}

void Connection::getPeerHostname(std::string& hostname, unsigned int& port)
{
    char host[255] = "";
    char serv[255] = "";

    getnameinfo_wrapper((struct sockaddr*)&remoteaddr,
                sizeof(struct sockaddr_in),
                host, sizeof(host),
                serv, sizeof(serv),
                NI_DGRAM | NI_NUMERICSERV);


    hostname = host;
    port = atoi(serv);
}

void Connection::getLocalAddress(unsigned char ip[4], unsigned int& port)
{
    struct sockaddr_in tmp;
    socklen_t socksize = sizeof(sockaddr_in);

    getsockname(skt, (struct sockaddr*)&tmp, &socksize);

    ip[0] = (tmp.sin_addr.s_addr & 0xFF);
    ip[1] = ((tmp.sin_addr.s_addr >> 8) & 0xFF);
    ip[2] = ((tmp.sin_addr.s_addr >> 16) & 0xFF);
    ip[3] = ((tmp.sin_addr.s_addr >> 24) & 0xFF);
    port = tmp.sin_port;
}

void Connection::getLocalHostname(std::string& hostname, unsigned int& port)
{
    char host[255] = "";
    char serv[255] = "";

    struct sockaddr_in tmp;
    socklen_t socksize = sizeof(sockaddr_in);

    getsockname(skt, (struct sockaddr*)&tmp, &socksize);

    getnameinfo_wrapper((struct sockaddr*)&tmp,
                sizeof(struct sockaddr_in),
                host, sizeof(host),
                serv, sizeof(serv),
                NI_DGRAM | NI_NUMERICSERV);


    hostname = host;
    port = atoi(serv);
}


void Connection::doCallback(Message* m)
{
    if(callback) {
        LOG("VIP", 4, "adding message callback");
        VUtil::TaskQueue::defaultTQ().addTask(new MsgTask(callback, m));
    } else {
        boost::mutex::scoped_lock lk(queuedRecv_mutex);
        if(callback) {
            LOG("VIP", 4, "adding message callback");
            VUtil::TaskQueue::defaultTQ().addTask(new MsgTask(callback, m));
        } else {
            LOG("VIP", 4, "queuing message");
            queuedRecv.push_back(vRef<Message>(m, true));
            queuedRecv_cond.notify_all();
        }
    }
}

int Connection::recv(int ToS, int channel, uint8_t* buf,
                     unsigned int bufsize, bool block)
{
    if(! isConnected()) return -1;

    unsigned int ptr = 0;

    boost::mutex::scoped_lock lk(queuedRecv_mutex);

    LOG("VIP", 4, this << ": called recv with " << ToS << " " << channel);

    do {
        for(unsigned int i = 0; i < queuedRecv.size() && ptr < bufsize; i++)
        {
            LOG("VIP", 4, this << ": chunk is " << (int)queuedRecv[i]->ToS << " "
                << (int)queuedRecv[i]->channel);
            if(queuedRecv[i]->ToS == ToS && queuedRecv[i]->channel == channel)
            {
                if(queuedRecv[i]->payload.size() <= (bufsize - ptr)) {
                    memcpy(buf + ptr,
                           queuedRecv[i]->payload.data(),
                           queuedRecv[i]->payload.size());
                    ptr += queuedRecv[i]->payload.size();
                    queuedRecv.pop_front();
                    i--;
                } else {
                    memcpy(buf + ptr, queuedRecv[i]->payload.data(),
                           bufsize - ptr);
                    queuedRecv[i]->payload.replace(0, bufsize - ptr, "");
                    ptr += (bufsize - ptr);
                }

                if(ToS == VIP_LOWLATENCY) break;
            }
        }
        LOG("VIP", 4, this << ": wait? " << block << " " << ptr);
        if(block && ptr == 0) {
            queuedRecv_cond.wait(lk);
        }
    } while(block && ptr == 0);

    return ptr;
}

void Connection::setCallback(MsgCallback* cb)
{
    boost::mutex::scoped_lock lk(queuedRecv_mutex);

    for(unsigned int i = 0; i < queuedRecv.size(); i++) {
        VUtil::TaskQueue::defaultTQ().addTask(new MsgTask(cb, queuedRecv[i]));
    }
    queuedRecv.clear();

    callback = cb;
}

void Connection::close(bool linger, bool waitForClose, bool norst)
{
    if(linger) state = Lingering;
    else state = Closed;

    noRST = norst;

    wakeupForSYN();
    wakeupSender();

    if(waitForClose) {
        while(state == Lingering) {
            boost::mutex::scoped_lock lk(close_mutex);
            close_cond.wait(lk);
        }
    }
}

unsigned int Connection::queuedBytes(int ToS, int channel)
{
    if(protos[ToS]) return protos[ToS]->queuedBytes(channel);
    else return 0;
}
