#ifndef _VIP_CONNECTION_HH_
#define _VIP_CONNECTION_HH_

#include <vos/vip/vipdefs.hh>

#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif

#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif

#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif

#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif

#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <deque>

#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>

#include <vos/vutil/structpack.hh>
#include <vos/vutil/refcount.hh>
#include <vos/vutil/taskqueue.hh>

namespace VIP {
    struct OutgoingMsg;
    struct Message;
    struct SendingCompletedCallback;
    class Connection;
    class SocketMultiplexer;
}

#include <vos/vip/protocol.hh>

namespace VIP {
    struct VIP_API Message : public VUtil::RefCounted
    {
        Message();

        VUtil::vRef<Connection> connection;
        uint8_t ToS;
        uint8_t channel;
        uint32_t msgid;
        std::string payload;
        SendingCompletedCallback* completedCallback;
    };

    struct VIP_API MsgCallback
    {
        virtual ~MsgCallback() { };
        virtual void handleMessage(Message* m) = 0;
    };

    struct VIP_API DisconnectCallback
    {
        virtual ~DisconnectCallback() { };
        virtual void notifyDisconnected(Connection* m) = 0;
    };

    struct VIP_API SendingCompletedCallback
    {
        virtual ~SendingCompletedCallback() { };
        virtual void notifySendingCompleted(Message* m) = 0;
    };

    class MsgTask : public VUtil::Task
    {
    private:
        MsgCallback* callback;
        VUtil::vRef<Message> msg;
    public:
        MsgTask(MsgCallback* c, Message* m) : callback(c), msg(m, true)
            { }
        virtual void doTask()
            {
                callback->handleMessage(msg);
            }
    };

    struct TimeEntry
    {
        uint32_t pkt;
        double tm;
    };

    class VIP_API Connection : public VUtil::RefCounted
    {
    public:
        enum ConnectState { Opening, WaitingForReply,
                            Connected, Lingering, Closed };
    private:
        int skt;
        struct sockaddr_in remoteaddr;

        ConnectState state;

        MsgCallback* callback;
        DisconnectCallback* disconCB;

        boost::mutex SYN_mutex;
        boost::condition SYN_cond;

        boost::mutex sender_mutex;
        boost::condition sender_cond;
        bool needSender;

        Protocol* protos[VIP_PROTOSLOTS];

        bool dieSenderDie;

        boost::mutex sendTimes_mutex;
        std::deque<TimeEntry> sendTimes;

        uint32_t pktCounter;
        uint32_t lastRecvPkt;

        double SRTT;
        double RTTVAR;
        double RTO;

        double bestPing;

        boost::recursive_mutex cwnd_mutex;
        unsigned int cwnd;

        boost::recursive_mutex bytesSent_mutex;
        unsigned int bytesSent;

        boost::recursive_mutex flightSize_mutex;
        unsigned int flightSize;
        unsigned int biggestFlight;

        boost::recursive_mutex ssthresh_mutex;
        unsigned int ssthresh;

        bool inLossRecovery;
        bool inFastRecovery;

        double lastAckTime;

        bool LLPactive;
        int starvationClock;

        boost::mutex queuedRecv_mutex;
        boost::condition queuedRecv_cond;
        std::deque< VUtil::vRef<Message> > queuedRecv;

        boost::mutex close_mutex;
        boost::condition close_cond;

        double lastSendTime;
        double lastRecvTime;
        double nextPing;

        double lastLLP;

        bool noRST;

        SocketMultiplexer* sktmpx;

        void makeSYN(uint8_t* outgoing, unsigned int* osz);

        void getOffsetsSYN(uint8_t* incoming, unsigned int isz,
                           uint16_t offsets[VIP_PROTOSLOTS]);

        bool replySYN(uint8_t* incoming, unsigned int isz,
                      uint8_t* outgoing, unsigned int* osz);

        void setupWithSYN(uint8_t* incoming, unsigned int isz);

        double computeWaitTime();
        void idle(double waittime);

        bool gatherData(int channel, uint8_t* outgoing, unsigned int& ptr,
                        unsigned int& usecwnd, unsigned int& useflightsize,
                        bool& more, int& canSend, bool& ackOnly);
        void dispatchData(uint8_t* outgoing, unsigned int& ptr,
                          unsigned int& useflightSize, bool ping);

        void cleanUp();

    public:

        Connection(int skt, sockaddr_in* remoteaddr, SocketMultiplexer* sm);

        ~Connection();

        bool isConnected() { return (state == Connected); }

        void queueMsg(Message* m);
        void flush();

        void setCallback(MsgCallback* cb);
        void setDisconnectCallback(DisconnectCallback* cb)
            { disconCB = cb; }

        void splitIntoChunks(uint8_t* buf, unsigned int sz);

        void sendSYN();
        bool handleSYN(uint8_t* buf, unsigned int sz);

        void sleepForSYN(uint8_t* buf, unsigned int sz);
        void wakeupForSYN();

        void doCallback(Message* m);

        void killSender();
        void senderLoop();
        void wakeupSender();

        double getSRTT() { return SRTT; }
        double getHighRTT() { if(SRTT > 0) return SRTT + RTTVAR*2; else return 60; }
        double getRTO() { return RTO; }
        unsigned int getCWND() { return cwnd; }

        void ackBytes(unsigned int bytes, bool resetRecovery);
        unsigned int totalBytesSent() { return bytesSent;}

        void notifyLoss();
        void fastRecovery();

        void getLocalAddress(unsigned char ipaddr[4], unsigned int& port);
        void getLocalHostname(std::string& hostname, unsigned int& port);

        void getPeerAddress(unsigned char ipaddr[4], unsigned int& port);
        void getPeerHostname(std::string& hostname, unsigned int& port);

        int recv(int ToS, int channel, uint8_t* data,
                 unsigned int maxsize, bool block);

        void close(bool linger = true,
                   bool waitForClose = false,
                   bool norst = false);

        unsigned int queuedBytes(int ToS, int channel);

        ConnectState getState() { return state; }
    };
}

#endif
