image-20220312222023662

WebServer

缓冲区 Buffer

buffer.h

#ifndef BUFFER_H
#define BUFFER_H
#include <cstring>   //perror
#include <iostream>
#include <unistd.h>  // write
#include <sys/uio.h> //readv
#include <vector> //readv
#include <atomic>  // 原子类型
#include <assert.h>
class Buffer {
public:
    Buffer(int initBuffSize = 1024);
    ~Buffer() = default;

    size_t WritableBytes() const;       // 可写字节数 :总长度-写位置
    size_t ReadableBytes() const ;      // 可读字节数 :写位置-读位置
    size_t PrependableBytes() const;    // 预置字节数 :读位置

    const char* Peek() const;           // 第一个数据字节
    void EnsureWriteable(size_t len);   // 确认可写长度为 len
    void HasWritten(size_t len);        // 已经写入了 len 个字节 调用 HasWritten 函数更新写位置

    void Retrieve(size_t len);          // 将读位置前移 len
    void RetrieveUntil(const char* end);// 将读位置前移至 const char* end

    void RetrieveAll() ;                // 缓冲区重置
    std::string RetrieveAllToStr();     // 返回现有缓冲区内容,并将缓冲区重置

    const char* BeginWriteConst() const;// 返回第一个可写位置
    char* BeginWrite();                 // 返回第一个可写位置

    void Append(const std::string& str);               
    void Append(const char* str, size_t len);           // 核心函数 另外三个都是调用的这个 将 str 起始位置长度为 len 的字符串写入到缓冲区 
    void Append(const void* data, size_t len);
    void Append(const Buffer& buff);

    ssize_t ReadFd(int fd, int* Errno);                 // 从文件描述符读取数据到缓冲区  
    ssize_t WriteFd(int fd, int* Errno);                // 将缓冲区数据写入到文件描述符

private:
    char* BeginPtr_();  // 缓冲区首地址
    const char* BeginPtr_() const; // 缓冲区首地址
    void MakeSpace_(size_t len);   // 获得空间 len

    std::vector<char> buffer_;  // 缓冲区 大小为 initBuffSize
    std::atomic<std::size_t> readPos_; // 缓冲区读地址(下标)(偏移量) 设置为原子类型,保证线程安全
    std::atomic<std::size_t> writePos_; // 缓冲区写地址(下标)(偏移量)
};

#endif //BUFFER_H

buffer.cpp

#include "buffer.h"

Buffer::Buffer(int initBuffSize) : buffer_(initBuffSize), readPos_(0), writePos_(0) {}

size_t Buffer::ReadableBytes() const {
    return writePos_ - readPos_;
}
size_t Buffer::WritableBytes() const {
    return buffer_.size() - writePos_;
}

size_t Buffer::PrependableBytes() const {
    return readPos_;
}

const char* Buffer::Peek() const {
    return BeginPtr_() + readPos_;
}

void Buffer::Retrieve(size_t len) {
    assert(len <= ReadableBytes());
    readPos_ += len;
}

void Buffer::RetrieveUntil(const char* end) {
    assert(Peek() <= end );
    Retrieve(end - Peek());
}

void Buffer::RetrieveAll() {
    bzero(&buffer_[0], buffer_.size());
    readPos_ = 0;
    writePos_ = 0;
}

std::string Buffer::RetrieveAllToStr() {
    std::string str(Peek(), ReadableBytes());  // string s(cstr,char_len) : 将缓冲区的内容构造出字符串 str 返回
    RetrieveAll();                             // 再将缓冲区重置
    return str;
}

const char* Buffer::BeginWriteConst() const {
    return BeginPtr_() + writePos_;
}

char* Buffer::BeginWrite() {
    return BeginPtr_() + writePos_;
}

void Buffer::HasWritten(size_t len) {
    writePos_ += len;
} 

void Buffer::Append(const std::string& str) {
    Append(str.data(), str.length());                // data 方法返回首地址 length 方法返回长度
}

void Buffer::Append(const void* data, size_t len) {
    assert(data);
    Append(static_cast<const char*>(data), len);     // void* 类型要静态转换为 const char*
}

void Buffer::Append(const char* str, size_t len) {
    assert(str);
    EnsureWriteable(len);                             // 确认可写长度 len
    std::copy(str, str + len, BeginWrite());          // 调用copy 进行写入
    HasWritten(len);                                  // 调用 HasWritten 更新写位置
}

void Buffer::Append(const Buffer& buff) {
    Append(buff.Peek(), buff.ReadableBytes());       // 将另一个缓冲区的内容写入到当前缓冲区
}

void Buffer::EnsureWriteable(size_t len) {
    if(WritableBytes() < len) {              // 如果可写长度(写位置到缓冲区结束)小于 len
        MakeSpace_(len);                     // 获得空间 len
    }
    assert(WritableBytes() >= len);
}

ssize_t Buffer::ReadFd(int fd, int* saveErrno) {
    char buff[65535];
    struct iovec iov[2];
    const size_t writable = WritableBytes();
    /* 分散读, 保证数据全部读完 */
    iov[0].iov_base = BeginPtr_() + writePos_;
    iov[0].iov_len = writable;
    iov[1].iov_base = buff;
    iov[1].iov_len = sizeof(buff);

    const ssize_t len = readv(fd, iov, 2);
    if(len < 0) {               // 读错误
        *saveErrno = errno;
    }
    else if(static_cast<size_t>(len) <= writable) {      // 读取字节数小于缓冲区剩余可写字节数
        writePos_ += len;
    }
    else {                             // buff 中有数据
        writePos_ = buffer_.size();    // 已经写满,写偏移到达结尾位置      
        Append(buff, len - writable);  // 将 buff 中的数据写入缓冲区  
    }
    return len;                        // 返回读取字节长度
}

ssize_t Buffer::WriteFd(int fd, int* saveErrno) {
    size_t readSize = ReadableBytes();               // 缓冲区中的字节数
    ssize_t len = write(fd, Peek(), readSize);       // 将缓冲区中的数据写入文件描述符
    if(len < 0) {
        *saveErrno = errno;
        return len;
    } 
    readPos_ += len;                                 // 更新可读位置
    return len;
}

char* Buffer::BeginPtr_() {                    // 缓冲区首地址
    return &*buffer_.begin();
}

const char* Buffer::BeginPtr_() const {        //  缓冲区首地址
    return &*buffer_.begin();
}

void Buffer::MakeSpace_(size_t len) {
    if(WritableBytes() + PrependableBytes() < len) {  // 缓冲区剩余长度小于 len的话
        buffer_.resize(writePos_ + len + 1);          // 扩容缓冲区
    } 
    else {                                            // 否则调整缓冲区数据位置 使空位置连续
        size_t readable = ReadableBytes();
        std::copy(BeginPtr_() + readPos_, BeginPtr_() + writePos_, BeginPtr_());  // 把一个序列拷贝到一个容器中去 就是把缓冲区原有数据移动到vector的起始地址处
        readPos_ = 0;   // 更新读位置
        writePos_ = readPos_ + readable; // 更新写位置
        assert(readable == ReadableBytes());  
    }
}

日志系统

log.h

#ifndef LOG_H
#define LOG_H

#include <mutex>
#include <string>
#include <thread>
#include <sys/time.h>
#include <string.h>
#include <stdarg.h>           // vastart va_end
#include <assert.h>
#include <sys/stat.h>         //mkdir
#include "blockqueue.h"
#include "../buffer/buffer.h"

class Log {
public:
    void init(int level, const char* path = "./log", 
                const char* suffix =".log",
                int maxQueueCapacity = 1024);

    static Log* Instance();
    static void FlushLogThread();

    void write(int level, const char *format,...);
    void flush();

    int GetLevel();
    void SetLevel(int level);
    bool IsOpen() { return isOpen_; }
    
private:
    Log();                                           // 私有化构造函数
    void AppendLogLevelTitle_(int level);
    virtual ~Log();                                  // 虚析构
    void AsyncWrite_();                              // 异步写

private:

    static const int LOG_PATH_LEN = 256;             // 日志路径长度 
    static const int LOG_NAME_LEN = 256;             // 日志名称长度
    static const int MAX_LINES = 50000;              // 最大行数

    const char* path_;                               // 路径     
    const char* suffix_;                             // 后缀

    int MAX_LINES_;                                  // 最大行

    int lineCount_;                                  // 行数
    int toDay_;                                      // 今天

    bool isOpen_;                                    // 打开标志
 
    Buffer buff_;                                    // 缓冲区
    int level_;                                      // 日志级别
    bool isAsync_;                                   // 异步标志

    FILE* fp_;                                       // 文件指针
    std::unique_ptr<BlockDeque<std::string>> deque_; // 阻塞队列 指针
    std::unique_ptr<std::thread> writeThread_;       // 写线程 指针
    std::mutex mtx_;                                 // 互斥锁
};

#define LOG_BASE(level, format, ...) \
    do {\
        Log* log = Log::Instance();\
        if (log->IsOpen() && log->GetLevel() <= level) {\
            log->write(level, format, ##__VA_ARGS__); \
            log->flush();\
        }\
    } while(0);

#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0);       // ##__VA_ARGS__ 可变参数宏
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);

#endif //LOG_H

log.cpp

#include "log.h"

using namespace std;

Log::Log() {
    lineCount_ = 0;
    isAsync_ = false;
    writeThread_ = nullptr;
    deque_ = nullptr;
    toDay_ = 0;
    fp_ = nullptr;
}

Log::~Log() {
    if(writeThread_ && writeThread_->joinable()) {
        while(!deque_->empty()) {
            deque_->flush();
        };
        deque_->Close();
        writeThread_->join();
    }
    if(fp_) {
        lock_guard<mutex> locker(mtx_);
        flush();
        fclose(fp_);
    }
}

int Log::GetLevel() {
    lock_guard<mutex> locker(mtx_);
    return level_;
}

void Log::SetLevel(int level) {
    lock_guard<mutex> locker(mtx_);
    level_ = level;
}

void Log::init(int level = 1, const char* path, const char* suffix,
    int maxQueueSize) {
    isOpen_ = true;
    level_ = level;
    if(maxQueueSize > 0) {
        isAsync_ = true;
        if(!deque_) {
            unique_ptr<BlockDeque<std::string>> newDeque(new BlockDeque<std::string>);
            deque_ = move(newDeque);
            
            std::unique_ptr<std::thread> NewThread(new thread(FlushLogThread));
            writeThread_ = move(NewThread);
        }
    } else {
        isAsync_ = false;
    }

    lineCount_ = 0;

    time_t timer = time(nullptr);
    struct tm *sysTime = localtime(&timer);
    struct tm t = *sysTime;
    path_ = path;
    suffix_ = suffix;
    char fileName[LOG_NAME_LEN] = {0};
    snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s", 
            path_, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_);
    toDay_ = t.tm_mday;

    {
        lock_guard<mutex> locker(mtx_);
        buff_.RetrieveAll();
        if(fp_) { 
            flush();
            fclose(fp_); 
        }

        fp_ = fopen(fileName, "a");
        if(fp_ == nullptr) {
            mkdir(path_, 0777);
            fp_ = fopen(fileName, "a");
        } 
        assert(fp_ != nullptr);
    }
}

void Log::write(int level, const char *format, ...) {
    struct timeval now = {0, 0};                         // 秒 和 微妙
    gettimeofday(&now, nullptr);
    time_t tSec = now.tv_sec;
    struct tm *sysTime = localtime(&tSec);
    struct tm t = *sysTime;
    va_list vaList;

    /* 日志日期 日志行数 */
    if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_  %  MAX_LINES == 0)))
    {
        unique_lock<mutex> locker(mtx_);
        locker.unlock();
        
        char newFile[LOG_NAME_LEN];
        char tail[36] = {0};
        snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);

        if (toDay_ != t.tm_mday)
        {
            snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);
            toDay_ = t.tm_mday;
            lineCount_ = 0;
        }
        else {
            snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_  / MAX_LINES), suffix_);
        }
        
        locker.lock();
        flush();
        fclose(fp_);
        fp_ = fopen(newFile, "a");
        assert(fp_ != nullptr);
    }

    {
        unique_lock<mutex> locker(mtx_);
        lineCount_++;
        int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ",
                    t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,
                    t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);
                    
        buff_.HasWritten(n);
        AppendLogLevelTitle_(level);

        va_start(vaList, format);
        int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);
        va_end(vaList);

        buff_.HasWritten(m);
        buff_.Append("\n\0", 2);

        if(isAsync_ && deque_ && !deque_->full()) {
            deque_->push_back(buff_.RetrieveAllToStr());
        } else {
            fputs(buff_.Peek(), fp_);
        }
        buff_.RetrieveAll();
    }
}

void Log::AppendLogLevelTitle_(int level) {
    switch(level) {
    case 0:
        buff_.Append("[debug]: ", 9);
        break;
    case 1:
        buff_.Append("[info] : ", 9);
        break;
    case 2:
        buff_.Append("[warn] : ", 9);
        break;
    case 3:
        buff_.Append("[error]: ", 9);
        break;
    default:
        buff_.Append("[info] : ", 9);
        break;
    }
}

void Log::flush() {
    if(isAsync_) { 
        deque_->flush(); 
    }
    fflush(fp_);
}

void Log::AsyncWrite_() {
    string str = "";
    while(deque_->pop(str)) {
        lock_guard<mutex> locker(mtx_);
        fputs(str.c_str(), fp_);
    }
}

Log* Log::Instance() {
    static Log inst;
    return &inst;
}

void Log::FlushLogThread() {
    Log::Instance()->AsyncWrite_();
}

blockqueue.h

#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <mutex>  // 互斥锁
#include <deque>
#include <condition_variable>
#include <sys/time.h>

// 类模板
// 声明和实现写在一个文件里免得出错
template<class T>
class BlockDeque {
public:
    explicit BlockDeque(size_t MaxCapacity = 1000);  // 显示构造函数

    ~BlockDeque();

    void clear();

    bool empty();

    bool full();

    void Close();

    size_t size();

    size_t capacity();

    T front();

    T back();

    void push_back(const T &item);

    void push_front(const T &item);

    bool pop(T &item);

    bool pop(T &item, int timeout);

    void flush();

private:
    std::deque<T> deq_;       // T 类型的队列

    size_t capacity_;         // 容量

    std::mutex mtx_;          // 互斥锁

    bool isClose_;            // 是否关闭

    std::condition_variable condConsumer_;     // 条件变量 消费者

    std::condition_variable condProducer_;     // 条件变量 生产者
};


template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) :capacity_(MaxCapacity) {
    assert(MaxCapacity > 0);
    isClose_ = false;
}

template<class T>
BlockDeque<T>::~BlockDeque() {
    Close();
};

template<class T>
void BlockDeque<T>::Close() {
    {   
        std::lock_guard<std::mutex> locker(mtx_);  // 程序在 std::lock_guard 的对象 locker 的生命周期中加锁  符合 RAII 机制
        deq_.clear();                              // 清空队列
        isClose_ = true;                           // 关闭标志为 true
    }
    condProducer_.notify_all();                    // 唤醒所有线程
    condConsumer_.notify_all();
};

template<class T>
void BlockDeque<T>::flush() {
    condConsumer_.notify_one();                    // 唤醒一个消费者线程
};

template<class T>
void BlockDeque<T>::clear() {                      // 清空队列
    std::lock_guard<std::mutex> locker(mtx_);
    deq_.clear();
}

template<class T>
T BlockDeque<T>::front() {                         // 返回队头元素
    std::lock_guard<std::mutex> locker(mtx_);
    return deq_.front();
}

template<class T>
T BlockDeque<T>::back() {                          // 返回队尾元素
    std::lock_guard<std::mutex> locker(mtx_);
    return deq_.back();
}

template<class T>
size_t BlockDeque<T>::size() {                     // 返回队列大小
    std::lock_guard<std::mutex> locker(mtx_);
    return deq_.size();
}

template<class T>
size_t BlockDeque<T>::capacity() {                 // 返回队列容量
    std::lock_guard<std::mutex> locker(mtx_);
    return capacity_;
}

template<class T>
void BlockDeque<T>::push_back(const T &item) {     // 尾插
    std::unique_lock<std::mutex> locker(mtx_);     // 这里是 unique_lock
    while(deq_.size() >= capacity_) {
        condProducer_.wait(locker);                // push 的时候 容量不足时,生产者线程在这等,等待pop函数中将其唤醒
    }
    deq_.push_back(item);                          // 容量够了 push 进去
    condConsumer_.notify_one();                    // 唤醒消费者线程
}

template<class T>
void BlockDeque<T>::push_front(const T &item) {        // 与上面同理
    std::unique_lock<std::mutex> locker(mtx_);
    while(deq_.size() >= capacity_) {
        condProducer_.wait(locker);
    }
    deq_.push_front(item);
    condConsumer_.notify_one();
}

template<class T>
bool BlockDeque<T>::empty() {                       // 判空
    std::lock_guard<std::mutex> locker(mtx_);
    return deq_.empty();
}

template<class T>
bool BlockDeque<T>::full(){                         // 判满
    std::lock_guard<std::mutex> locker(mtx_);
    return deq_.size() >= capacity_;
}

template<class T>
bool BlockDeque<T>::pop(T &item) {                  // 弹出队头元素 传出参数 item
    std::unique_lock<std::mutex> locker(mtx_);
    while(deq_.empty()){
        condConsumer_.wait(locker);
        if(isClose_){
            return false;
        }
    }
    item = deq_.front();
    deq_.pop_front();
    condProducer_.notify_one();
    return true;
}

template<class T>
bool BlockDeque<T>::pop(T &item, int timeout) {
    std::unique_lock<std::mutex> locker(mtx_);
    while(deq_.empty()){
        if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout))    // 带等待时间的弹出
                == std::cv_status::timeout){
            return false;
        }
        if(isClose_){
            return false;
        }
    }
    item = deq_.front();
    deq_.pop_front();
    condProducer_.notify_one();
    return true;
}

#endif // BLOCKQUEUE_H

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {
public:
    explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
            assert(threadCount > 0);
            for(size_t i = 0; i < threadCount; i++) {                 // threadCount 个线程处理函数队列里的函数
                std::thread([pool = pool_] {
                    std::unique_lock<std::mutex> locker(pool->mtx);
                    while(true) {
                        if(!pool->tasks.empty()) {
                            auto task = std::move(pool->tasks.front());  // 操作任务队列时 要上锁
                            pool->tasks.pop();
                            locker.unlock();
                            task();                                      // 执行任务时解锁 其他线程可以操作队列
                            locker.lock();
                        } 
                        else if(pool->isClosed) break;
                        else pool->cond.wait(locker);                    // 没有任务了 等待
                    }
                }).detach();              // 使用 join 时 主线程阻塞  使用 detach(分离)主线程继续执行
            }
    }

    ThreadPool() = default;

    ThreadPool(ThreadPool&&) = default;
    
    ~ThreadPool() {
        if(static_cast<bool>(pool_)) {
            {
                std::lock_guard<std::mutex> locker(pool_->mtx);
                pool_->isClosed = true;
            }
            pool_->cond.notify_all();
        }
    }

    template<class F>
    void AddTask(F&& task) {
        {
            std::lock_guard<std::mutex> locker(pool_->mtx);
            pool_->tasks.emplace(std::forward<F>(task));       
        }
        pool_->cond.notify_one();                      // 加入任务了 唤醒一个线程
    }

private:
    struct Pool {                         // Pool 结构体
        std::mutex mtx;                   // 互斥锁
        std::condition_variable cond;     // 条件变量
        bool isClosed;                    // 关闭标志
        std::queue<std::function<void()>> tasks;   // 函数队列
    };
    std::shared_ptr<Pool> pool_;          // 指向 Pool 的只能指针
};


#endif //THREADPOOL_H

sqlconnpool.h


#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>          // 信号量
#include <thread>
#include "../log/log.h"

class SqlConnPool {
public:
    static SqlConnPool *Instance();              // 单例模式

    MYSQL *GetConn();                            // 获取 mysql 连接
    void FreeConn(MYSQL * conn);                 // 释放 mysql 连接
    int GetFreeConnCount();                      // 获得空闲连接数量

    void Init(const char* host, int port,        // 初始化
              const char* user,const char* pwd, 
              const char* dbName, int connSize);
    void ClosePool();                            // 关闭连接池

private:
    SqlConnPool();
    ~SqlConnPool();

    int MAX_CONN_;                               // 最大连接数量
    int useCount_;                               // 用户数量
    int freeCount_;                              // 空闲连接数量

    std::queue<MYSQL *> connQue_;                // mysql连接队列
    std::mutex mtx_;                             // 互斥锁
    sem_t semId_;                                // 信号量
};


#endif // SQLCONNPOOL_H

sqlconnpool.cpp


#include "sqlconnpool.h"
using namespace std;

SqlConnPool::SqlConnPool() {                // 构造函数
    useCount_ = 0;
    freeCount_ = 0;
}

SqlConnPool* SqlConnPool::Instance() {
    static SqlConnPool connPool;             // 静态对象 全局就这一个
    return &connPool;
}

void SqlConnPool::Init(const char* host, int port,
            const char* user,const char* pwd, const char* dbName,
            int connSize = 10) {
    assert(connSize > 0);
    for (int i = 0; i < connSize; i++) {
        MYSQL *sql = nullptr;                  // 生命 MYSQL 指针
        sql = mysql_init(sql);                 // 初始化 sql 连接
        if (!sql) {
            LOG_ERROR("MySql init error!");
            assert(sql);
        }
        sql = mysql_real_connect(sql, host,    // 连接 mysql
                                 user, pwd,
                                 dbName, port, nullptr, 0);
        if (!sql) {
            LOG_ERROR("MySql Connect error!");
        }
        connQue_.push(sql);                    // 加入到连接队列
    }
    MAX_CONN_ = connSize;
    sem_init(&semId_, 0, MAX_CONN_);           // 初始化信号量       第二个参数为 0 时 当前进程的所有线程共享 否则在进程间共享 MAX_CONN_为信号量初始值 可用资源数目
}

MYSQL* SqlConnPool::GetConn() {
    MYSQL *sql = nullptr;
    if(connQue_.empty()){
        LOG_WARN("SqlConnPool busy!");
        return nullptr;
    }
    sem_wait(&semId_);                         // sem_wait 当信号量大于0时 使其减1 否则阻塞
    {
        lock_guard<mutex> locker(mtx_);        // 操作队列前要上锁
        sql = connQue_.front();                // 从连接队列中取出连接
        connQue_.pop();
    }
    return sql;
}

void SqlConnPool::FreeConn(MYSQL* sql) {       // 释放连接
    assert(sql);
    lock_guard<mutex> locker(mtx_);
    connQue_.push(sql);                        // 连接用完了 放回队列里
    sem_post(&semId_);                         // 信号量加一  也就是可用资源加一
}

void SqlConnPool::ClosePool() {                // 关闭连接池
    lock_guard<mutex> locker(mtx_);
    while(!connQue_.empty()) {
        auto item = connQue_.front();
        connQue_.pop();
        mysql_close(item);                     // 逐一关闭连接
    }
    mysql_library_end();                       // mysql使用结束后要调用这个函数 进行一些内存清理            
}

int SqlConnPool::GetFreeConnCount() {          // 可用连接数量  就是队列的长度
    lock_guard<mutex> locker(mtx_);
    return connQue_.size();
}

SqlConnPool::~SqlConnPool() {
    ClosePool();
}

sqlconnRAII.h

#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include "sqlconnpool.h"

/* 资源在对象构造初始化 资源在对象析构时释放*/ 
class SqlConnRAII {
public:
    SqlConnRAII(MYSQL** sql, SqlConnPool *connpool) {   // sql 是传出参数 
        assert(connpool);
        *sql = connpool->GetConn();                     // SqlConnRAII 的构造函数中 获得连接
        sql_ = *sql;
        connpool_ = connpool;
    }
    
    ~SqlConnRAII() {
        if(sql_) { connpool_->FreeConn(sql_); }         // SqlConnRAII 的析构函数中 释放连接
    }
    
private:
    MYSQL *sql_;                   // mysql 指针
    SqlConnPool* connpool_;        // 连接池 指针
};

#endif //SQLCONNRAII_H

定时器

heaptimer.h

#ifndef HEAP_TIMER_H
#define HEAP_TIMER_H

#include <queue>
#include <unordered_map>
#include <time.h>
#include <algorithm>
#include <arpa/inet.h> 
#include <functional> 
#include <assert.h> 
#include <chrono>                    // 时间工具
#include "../log/log.h"

typedef std::function<void()> TimeoutCallBack;
typedef std::chrono::high_resolution_clock Clock;   // 时钟
typedef std::chrono::milliseconds MS;               // 毫秒
typedef Clock::time_point TimeStamp;                // 时间点

struct TimerNode {                                  // 定时器节点
    int id;
    TimeStamp expires;                              // 过期时间
    TimeoutCallBack cb;
    bool operator<(const TimerNode& t) {
        return expires < t.expires;
    }
};
class HeapTimer {                                  // 时间堆类
public:
    HeapTimer() { heap_.reserve(64); }             

    ~HeapTimer() { clear(); }
    
    void adjust(int id, int newExpires);

    void add(int id, int timeOut, const TimeoutCallBack& cb);

    void doWork(int id);

    void clear();

    void tick();

    void pop();

    int GetNextTick();

private:
    void del_(size_t i);                         // 删除节点
    
    void siftup_(size_t i);                      // 向上调整
 
    bool siftdown_(size_t index, size_t n);      // 向下调整

    void SwapNode_(size_t i, size_t j);          // 交换节点

    std::vector<TimerNode> heap_;                // 堆

    std::unordered_map<int, size_t> ref_;        // id-下标 映射表
};

#endif //HEAP_TIMER_H

heaptimer.cpp

#include "heaptimer.h"

void HeapTimer::siftup_(size_t i) {                    // 向上调整
    assert(i >= 0 && i < heap_.size());
    size_t j = (i - 1) / 2;                             // j 是 i 的父节点
    while(j >= 0) {
        if(heap_[j] < heap_[i]) { break; }              // 父节点比子节点小  不需要调整
        SwapNode_(i, j);                                // 否则交换
        i = j;
        j = (i - 1) / 2;                                // 继续向上调整
    }
}

void HeapTimer::SwapNode_(size_t i, size_t j) {
    assert(i >= 0 && i < heap_.size());
    assert(j >= 0 && j < heap_.size());
    std::swap(heap_[i], heap_[j]);         // 交换节点
    ref_[heap_[i].id] = i;                 // 更新 映射关系
    ref_[heap_[j].id] = j;
} 

bool HeapTimer::siftdown_(size_t index, size_t n) {             // 向下调整
    assert(index >= 0 && index < heap_.size());
    assert(n >= 0 && n <= heap_.size());
    size_t i = index;
    size_t j = i * 2 + 1;                                    // 左子节点
    while(j < n) {
        if(j + 1 < n && heap_[j + 1] < heap_[j]) j++;        // j + 1 是右子节点   如果j+1更小  那么j = j+1   
        if(heap_[i] < heap_[j]) break;                       // i 比 j 小 不需要调整
        SwapNode_(i, j);                                     // 否则交换 i j
        i = j;                                               // 继续循环向下调整
        j = i * 2 + 1;
    }
    return i > index;                                        // 返回是否调整过
}

void HeapTimer::add(int id, int timeout, const TimeoutCallBack& cb) {
    assert(id >= 0);
    size_t i;
    if(ref_.count(id) == 0) {
        /* 新节点:堆尾插入,调整堆 */
        i = heap_.size();
        ref_[id] = i;
        heap_.push_back({id, Clock::now() + MS(timeout), cb});
        siftup_(i);
    } 
    else {
        /* 已有结点:调整堆 */
        i = ref_[id];
        heap_[i].expires = Clock::now() + MS(timeout);
        heap_[i].cb = cb;
        if(!siftdown_(i, heap_.size())) {
            siftup_(i);
        }
    }
}

void HeapTimer::doWork(int id) {
    /* 删除指定id结点,并触发回调函数 */
    if(heap_.empty() || ref_.count(id) == 0) {
        return;
    }
    size_t i = ref_[id];
    TimerNode node = heap_[i];
    node.cb();
    del_(i);
}

void HeapTimer::del_(size_t index) {
    /* 删除指定位置的结点 */
    assert(!heap_.empty() && index >= 0 && index < heap_.size());
    /* 将要删除的结点换到队尾,然后调整堆 */
    size_t i = index;
    size_t n = heap_.size() - 1;
    assert(i <= n);
    if(i < n) {
        SwapNode_(i, n);
        if(!siftdown_(i, n)) {
            siftup_(i);
        }
    }
    /* 队尾元素删除 */
    ref_.erase(heap_.back().id);
    heap_.pop_back();
}

void HeapTimer::adjust(int id, int timeout) {
    /* 调整指定id的结点 */
    assert(!heap_.empty() && ref_.count(id) > 0);
    heap_[ref_[id]].expires = Clock::now() + MS(timeout);;
    siftdown_(ref_[id], heap_.size());
}

void HeapTimer::tick() {
    /* 清除超时结点 */
    if(heap_.empty()) {
        return;
    }
    while(!heap_.empty()) {
        TimerNode node = heap_.front();
        if(std::chrono::duration_cast<MS>(node.expires - Clock::now()).count() > 0) { 
            break; 
        }
        node.cb();
        pop();
    }
}

void HeapTimer::pop() {
    assert(!heap_.empty());
    del_(0);
}

void HeapTimer::clear() {
    ref_.clear();
    heap_.clear();
}

int HeapTimer::GetNextTick() {
    tick();
    size_t res = -1;
    if(!heap_.empty()) {
        res = std::chrono::duration_cast<MS>(heap_.front().expires - Clock::now()).count();
        if(res < 0) { res = 0; }
    }
    return res;
}

HTTP 连接

httprequest.h

#ifndef HTTP_REQUEST_H
#define HTTP_REQUEST_H

#include <unordered_map>
#include <unordered_set>
#include <string>
#include <regex>           // 正则匹配
#include <errno.h>     
#include <mysql/mysql.h>  //mysql

#include "../buffer/buffer.h"
#include "../log/log.h"
#include "../pool/sqlconnpool.h"
#include "../pool/sqlconnRAII.h"

class HttpRequest {
public:
    enum PARSE_STATE {         // 定义行解析状态
        REQUEST_LINE,
        HEADERS,
        BODY,
        FINISH,        
    };

    enum HTTP_CODE {          // 定义 HTTP 状态码
        NO_REQUEST = 0,
        GET_REQUEST,
        BAD_REQUEST,
        NO_RESOURSE,
        FORBIDDENT_REQUEST,
        FILE_REQUEST,
        INTERNAL_ERROR,
        CLOSED_CONNECTION,
    };
    
    HttpRequest() { Init(); }
    ~HttpRequest() = default;

    void Init();
    bool parse(Buffer& buff);

    std::string path() const;
    std::string& path();
    std::string method() const;
    std::string version() const;
    std::string GetPost(const std::string& key) const;
    std::string GetPost(const char* key) const;

    bool IsKeepAlive() const;

    /* 
    todo 
    void HttpConn::ParseFormData() {}
    void HttpConn::ParseJson() {}
    */

private:
    bool ParseRequestLine_(const std::string& line);
    void ParseHeader_(const std::string& line);
    void ParseBody_(const std::string& line);

    void ParsePath_();
    void ParsePost_();
    void ParseFromUrlencoded_(int tag);

    static bool UserVerify(const std::string& name, const std::string& pwd, bool isLogin);

    PARSE_STATE state_;
    std::string method_, path_, version_, body_;               // 请求方法 请求路径 请求版本 请求体
    std::unordered_map<std::string, std::string> header_;      // 请求头  
    std::unordered_map<std::string, std::string> post_;

    static const std::unordered_set<std::string> DEFAULT_HTML;
    static const std::unordered_map<std::string, int> DEFAULT_HTML_TAG;
    static int ConverHex(char ch);
};


#endif //HTTP_REQUEST_H

httprequest.cpp

#include "httprequest.h"
using namespace std;

const unordered_set<string> HttpRequest::DEFAULT_HTML{
            "/index", "/register", "/login",
             "/welcome", "/video", "/picture","/sendmail" };      // 默认 html

const unordered_map<string, int> HttpRequest::DEFAULT_HTML_TAG {        // html 标记
            {"/register.html", 0}, {"/login.html", 1}, {"/sendmail.html", 2} };

void HttpRequest::Init() {
    method_ = path_ = version_ = body_ = "";
    state_ = REQUEST_LINE;
    header_.clear();
    post_.clear();
}

bool HttpRequest::IsKeepAlive() const {
    if(header_.count("Connection") == 1) {
        return header_.find("Connection")->second == "keep-alive" && version_ == "1.1";       // 是否 keep-alive
    }
    return false;
}

bool HttpRequest::parse(Buffer& buff) {                         // 缓冲区解析
    const char CRLF[] = "\r\n";                                 // 预定义 \r\n 数组
    if(buff.ReadableBytes() <= 0) {                             // 缓冲区没有内容 返回失败
        return false;
    }
    while(buff.ReadableBytes() && state_ != FINISH) {           // 缓冲区有内容 并且 http 状态不是 FINISH
        const char* lineEnd = search(buff.Peek(), buff.BeginWriteConst(), CRLF, CRLF + 2);  // search 参数代表两个左闭右开区间 
        std::string line(buff.Peek(), lineEnd);                 // 构造字符串  获得一行
        switch(state_)                                          // 有限状态机编程
        {
        case REQUEST_LINE:
            if(!ParseRequestLine_(line)) {                      // 解析请求行
                return false;
            }
            ParsePath_();
            break;    
        case HEADERS:
            ParseHeader_(line);                                 // 解析头部
            if(buff.ReadableBytes() <= 2) {
                state_ = FINISH;
            }
            break;
        case BODY:
            ParseBody_(line);                                   // 解析 body
            break;
        default:
            break;
        }
        if(lineEnd == buff.BeginWrite()) { break; }             // 缓冲区中无数据 
        buff.RetrieveUntil(lineEnd + 2);                        // 更新缓冲区的读位置
    }
    LOG_DEBUG("[%s], [%s], [%s]", method_.c_str(), path_.c_str(), version_.c_str());
    return true;                                                // 解析完成
}

void HttpRequest::ParsePath_() {                                // 解析请求路径
    if(path_ == "/") {                                          
        path_ = "/index.html"; 
    }
    else {
        for(auto &item: DEFAULT_HTML) {
            if(item == path_) {
                path_ += ".html";                                // 为请求路径加上html
                break;
            }
        }
    }
}

bool HttpRequest::ParseRequestLine_(const string& line) {                      // 解析请求行
    regex patten("^([^ ]*) ([^ ]*) HTTP/([^ ]*)$");                            // 正则匹配  用两个空格分隔 分别获得请求方法 请求路径 和 HTTP 版本
    smatch subMatch;                                                           // 匹配结果
    if(regex_match(line, subMatch, patten)) {   
        method_ = subMatch[1];
        path_ = subMatch[2];
        version_ = subMatch[3];
        state_ = HEADERS;                                                      // 更新状态为 获取头部
        return true;
    }
    LOG_ERROR("RequestLine Error");                                            // 匹配失败 行错误
    return false;
}

void HttpRequest::ParseHeader_(const string& line) {                           // 解析头部
    regex patten("^([^:]*): ?(.*)$");                                          // 用 冒号空格分割的字符串
    smatch subMatch;
    if(regex_match(line, subMatch, patten)) {
        header_[subMatch[1]] = subMatch[2];
    }
    else {                                    // 如果匹配失败 代表头部解析完成
        state_ = BODY;                        // 更新状态
    }
}

void HttpRequest::ParseBody_(const string& line) {                // 解析请求体
    body_ = line;                                                 // 请求体有内容是 post 请求
    ParsePost_();                                                 // 解析 post
    state_ = FINISH;                                              // 解析请求结束
    LOG_DEBUG("Body:%s, len:%d", line.c_str(), line.size());
}

int HttpRequest::ConverHex(char ch) {
    if(ch >= 'A' && ch <= 'F') return ch -'A' + 10;
    if(ch >= 'a' && ch <= 'f') return ch -'a' + 10;
    return ch;
}

void HttpRequest::ParsePost_() {
    if(method_ == "POST" && header_["Content-Type"] == "application/x-www-form-urlencoded") {   // 如果内容类型是 url编码
        if(DEFAULT_HTML_TAG.count(path_)) {
            int tag = DEFAULT_HTML_TAG.find(path_)->second;
            LOG_DEBUG("Tag:%d", tag);
            if(tag == 0 || tag == 1) {
                ParseFromUrlencoded_(tag);                                                                 // 从 url 中解析数据
                bool isLogin = (tag == 1);
                if(UserVerify(post_["username"], post_["password"], isLogin)) { // 若果是登录 要进行用户验证
                    path_ = "/welcome.html";
                } 
                else {
                    path_ = "/error.html";
                }
            }
            else if(tag == 2){
                ParseFromUrlencoded_(tag);
                string cmd = "python3 /root/MyWebServer/mail.py ";
                cmd += post_["mailto"];
                cmd += " ";
                cmd += post_["subject"];
                cmd += " ";
                cmd += post_["content"];
                // cout << cmd << endl;
                if(system(cmd.c_str()) == 0){
                    path_ = "/report.html";
                }
                else{
                    path_ = "/error.html";
                }
                
            }
        }
    }   
}

void HttpRequest::ParseFromUrlencoded_(int tag) {
    if(tag == 0 || tag == 1){
        if(body_.size() == 0) { return; }

        string key, value;
        int num = 0;
        int n = body_.size();
        int i = 0, j = 0;

        for(; i < n; i++) {
            char ch = body_[i];
            switch (ch) {
            case '=':
                key = body_.substr(j, i - j);   // 等号前的是 key
                j = i + 1;
                break;
            case '+':
                body_[i] = ' ';                 // url 编码中 空格被编码成加号
                break;
            case '%':
                num = ConverHex(body_[i + 1]) * 16 + ConverHex(body_[i + 2]);         // url 解码 从16进制转换成10进制
                body_[i + 2] = num % 10 + '0';
                body_[i + 1] = num / 10 + '0';
                i += 2;
                break;
            case '&':
                value = body_.substr(j, i - j);     // 等号和 & 之间的是 value
                j = i + 1;                          // j 是起始位置 更新
                post_[key] = value;
                LOG_DEBUG("%s = %s", key.c_str(), value.c_str());
                break;
            default:
                break;
            }
        }
        assert(j <= i);
        if(post_.count(key) == 0 && j < i) {            // 插入最后一个 value
            value = body_.substr(j, i - j);
            post_[key] = value;
        }
    }
    else if(tag == 2){
        if(body_.size() == 0){
            return;
        }
        string key,value;
        int n = body_.size();
        size_t i = 0,j = 0;
        while(body_.find('&',j) != -1){
            i = body_.find('&',j);
            size_t k = body_.find('=',j);
            key = body_.substr(j,k-j);
            value = body_.substr(k+1,i-k-1);
            j = i + 1;
            post_[key] = value;
        }
        assert(j < n);
        i = body_.find('=',j);
        key = body_.substr(j,i-j);
        value = body_.substr(i+1,n-i-1);
        post_[key] = value;
    }
}


bool HttpRequest::UserVerify(const string &name, const string &pwd, bool isLogin) {
    if(name == "" || pwd == "") { return false; }
    LOG_INFO("Verify name:%s pwd:%s", name.c_str(), pwd.c_str());
    MYSQL* sql;
    SqlConnRAII(&sql,  SqlConnPool::Instance());   // 获得 sql 连接
    assert(sql);
    
    bool flag = false;
    unsigned int j = 0;
    char order[256] = { 0 };                // 查询语句
    MYSQL_FIELD *fields = nullptr;          // 字段列数组
    MYSQL_RES *res = nullptr;               // 返回行的一个查询结果集
    
    if(!isLogin) { flag = true; }
    /* 查询用户及密码 */
    snprintf(order, 256, "SELECT username, password FROM user WHERE username='%s' LIMIT 1", name.c_str());
    LOG_DEBUG("%s", order);

    if(mysql_query(sql, order)) {   // 查询成功返回 0 
        mysql_free_result(res);
        return false; 
    }
    res = mysql_store_result(sql);      // 存储结果集
    j = mysql_num_fields(res);            // 字段数量
    fields = mysql_fetch_fields(res);      // 字段结构数组

    while(MYSQL_ROW row = mysql_fetch_row(res)) {   // 获得一行
        LOG_DEBUG("MYSQL ROW: %s %s", row[0], row[1]);
        string password(row[1]);                  // row[0] 是用户名 row[1] 是密码
        /* 登录行为 验证密码 */
        if(isLogin) {
            if(pwd == password) { flag = true; }
            else {
                flag = false;
                LOG_DEBUG("pwd error!");
            }
        } 
        else { 
            flag = false; 
            LOG_DEBUG("user used!");
        }
    }
    mysql_free_result(res);

    /* 注册行为 且 用户名未被使用*/
    if(!isLogin && flag == true) {
        LOG_DEBUG("regirster!");
        bzero(order, 256);
        snprintf(order, 256,"INSERT INTO user(username, password) VALUES('%s','%s')", name.c_str(), pwd.c_str());
        LOG_DEBUG( "%s", order);
        if(mysql_query(sql, order)) { 
            LOG_DEBUG( "Insert error!");
            flag = false; 
        }
        flag = true;
    }
    SqlConnPool::Instance()->FreeConn(sql);
    LOG_DEBUG( "UserVerify success!!");
    return flag;
}

std::string HttpRequest::path() const{
    return path_;
}

std::string& HttpRequest::path(){
    return path_;
}
std::string HttpRequest::method() const {
    return method_;
}

std::string HttpRequest::version() const {
    return version_;
}

std::string HttpRequest::GetPost(const std::string& key) const {
    assert(key != "");
    if(post_.count(key) == 1) {
        return post_.find(key)->second;
    }
    return "";
}

std::string HttpRequest::GetPost(const char* key) const {
    assert(key != nullptr);
    if(post_.count(key) == 1) {
        return post_.find(key)->second;
    }
    return "";
}

httpresponse.h

#ifndef HTTP_RESPONSE_H
#define HTTP_RESPONSE_H

#include <unordered_map>
#include <fcntl.h>       // open
#include <unistd.h>      // close
#include <sys/stat.h>    // stat
#include <sys/mman.h>    // mmap, munmap  共享内存

#include "../buffer/buffer.h"
#include "../log/log.h"

class HttpResponse {
public:
    HttpResponse();
    ~HttpResponse();

    void Init(const std::string& srcDir, std::string& path, bool isKeepAlive = false, int code = -1);
    void MakeResponse(Buffer& buff);
    void UnmapFile();
    char* File();
    size_t FileLen() const;
    void ErrorContent(Buffer& buff, std::string message);
    int Code() const { return code_; }

private:
    void AddStateLine_(Buffer &buff);
    void AddHeader_(Buffer &buff);
    void AddContent_(Buffer &buff);

    void ErrorHtml_();
    std::string GetFileType_();

    int code_;
    bool isKeepAlive_;

    std::string path_;
    std::string srcDir_;
    
    char* mmFile_; 
    struct stat mmFileStat_;

    static const std::unordered_map<std::string, std::string> SUFFIX_TYPE;
    static const std::unordered_map<int, std::string> CODE_STATUS;
    static const std::unordered_map<int, std::string> CODE_PATH;
};


#endif //HTTP_RESPONSE_H

httpresponse.cpp

#include "httpresponse.h"

using namespace std;

const unordered_map<string, string> HttpResponse::SUFFIX_TYPE = {               // 后缀类型
    { ".html",  "text/html" },
    { ".xml",   "text/xml" },
    { ".xhtml", "application/xhtml+xml" },
    { ".txt",   "text/plain" },
    { ".rtf",   "application/rtf" },
    { ".pdf",   "application/pdf" },
    { ".word",  "application/nsword" },
    { ".png",   "image/png" },
    { ".gif",   "image/gif" },
    { ".jpg",   "image/jpeg" },
    { ".jpeg",  "image/jpeg" },
    { ".au",    "audio/basic" },
    { ".mpeg",  "video/mpeg" },
    { ".mpg",   "video/mpeg" },
    { ".avi",   "video/x-msvideo" },
    { ".gz",    "application/x-gzip" },
    { ".tar",   "application/x-tar" },
    { ".css",   "text/css "},
    { ".js",    "text/javascript "},
};

const unordered_map<int, string> HttpResponse::CODE_STATUS = {             // 响应状态码
    { 200, "OK" },
    { 400, "Bad Request" },
    { 403, "Forbidden" },
    { 404, "Not Found" },
};

const unordered_map<int, string> HttpResponse::CODE_PATH = {               //  状态码 和 文件映射
    { 400, "/400.html" },
    { 403, "/403.html" },
    { 404, "/404.html" },
};

HttpResponse::HttpResponse() {                      // 构造函数
    code_ = -1;
    path_ = srcDir_ = "";
    isKeepAlive_ = false;
    mmFile_ = nullptr; 
    mmFileStat_ = { 0 };
};

HttpResponse::~HttpResponse() {                     // 析构函数
    UnmapFile();
}

void HttpResponse::Init(const string& srcDir, string& path, bool isKeepAlive, int code){      // 初始化       
    assert(srcDir != "");
    if(mmFile_) { UnmapFile(); }
    code_ = code;
    isKeepAlive_ = isKeepAlive;
    path_ = path;
    srcDir_ = srcDir;
    mmFile_ = nullptr; 
    mmFileStat_ = { 0 };                  // 文件信息结构体 stat 
}

void HttpResponse::MakeResponse(Buffer& buff) {
    /* 判断请求的资源文件 */
    if(stat((srcDir_ + path_).data(), &mmFileStat_) < 0 || S_ISDIR(mmFileStat_.st_mode)) {       // stat 函数查看文件状态 第一个参数是文件名 第二个是传出参数      S_ISDIR 宏 检查文件是否是目录
        code_ = 404;                                      // 没有这个文件 或者是 目录 返回 404
    }
    else if(!(mmFileStat_.st_mode & S_IROTH)) {             // S_IROTH 其他组 读权限
        code_ = 403;                                      // 拒绝
    }
    else if(code_ == -1) {                             
        code_ = 200; 
    }
    ErrorHtml_();                                        // 生成错误页面
    AddStateLine_(buff);                                 // 向缓冲区中添加状态行
    AddHeader_(buff);
    AddContent_(buff);
}

char* HttpResponse::File() {
    return mmFile_;
}

size_t HttpResponse::FileLen() const {         // 文件大小
    return mmFileStat_.st_size;
}

void HttpResponse::ErrorHtml_() {
    if(CODE_PATH.count(code_) == 1) {
        path_ = CODE_PATH.find(code_)->second;
        stat((srcDir_ + path_).data(), &mmFileStat_);
    }
}

void HttpResponse::AddStateLine_(Buffer& buff) {
    string status;
    if(CODE_STATUS.count(code_) == 1) {
        status = CODE_STATUS.find(code_)->second;
    }
    else {                                        
        code_ = 400;
        status = CODE_STATUS.find(400)->second;
    }
    buff.Append("HTTP/1.1 " + to_string(code_) + " " + status + "\r\n");        // 向缓冲区中写入状态行
}

void HttpResponse::AddHeader_(Buffer& buff) {              // 添加响应头部信息
    buff.Append("Connection: "); 
    if(isKeepAlive_) {
        buff.Append("keep-alive\r\n");
        buff.Append("keep-alive: max=6, timeout=120\r\n");
    } else{
        buff.Append("close\r\n");
    }
    buff.Append("Content-type: " + GetFileType_() + "\r\n");
}

void HttpResponse::AddContent_(Buffer& buff) {                // 添加响应内容
    int srcFd = open((srcDir_ + path_).data(), O_RDONLY);     // 只读形式打开文件
    if(srcFd < 0) { 
        ErrorContent(buff, "File NotFound!");
        return; 
    }

    /* 将文件映射到内存提高文件的访问速度 
        MAP_PRIVATE 建立一个写入时拷贝的私有映射*/
    LOG_DEBUG("file path %s", (srcDir_ + path_).data());
    int* mmRet = (int*)mmap(0, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0);         // 将文件映射到共享内存
    if(*mmRet == -1) {
        ErrorContent(buff, "File NotFound!");
        return; 
    }
    mmFile_ = (char*)mmRet;         
    close(srcFd);
    buff.Append("Content-length: " + to_string(mmFileStat_.st_size) + "\r\n\r\n"); 
}

void HttpResponse::UnmapFile() {
    if(mmFile_) {
        munmap(mmFile_, mmFileStat_.st_size);  // 关闭共享内存
        mmFile_ = nullptr;
    }
}

string HttpResponse::GetFileType_() {
    /* 判断文件类型 */
    string::size_type idx = path_.find_last_of('.');
    if(idx == string::npos) {
        return "text/plain";
    }
    string suffix = path_.substr(idx);
    if(SUFFIX_TYPE.count(suffix) == 1) {
        return SUFFIX_TYPE.find(suffix)->second;
    }
    return "text/plain";
}

void HttpResponse::ErrorContent(Buffer& buff, string message) 
{
    string body;
    string status;
    body += "<html><title>Error</title>";
    body += "<body bgcolor=\"ffffff\">";
    if(CODE_STATUS.count(code_) == 1) {
        status = CODE_STATUS.find(code_)->second;
    } else {
        status = "Bad Request";
    }
    body += to_string(code_) + " : " + status  + "\n";
    body += "<p>" + message + "</p>";
    body += "<hr><em>TinyWebServer</em></body></html>";

    buff.Append("Content-length: " + to_string(body.size()) + "\r\n\r\n");
    buff.Append(body);
}

httpconn.h

#ifndef HTTP_CONN_H
#define HTTP_CONN_H

#include <sys/types.h>
#include <sys/uio.h>     // readv/writev
#include <arpa/inet.h>   // sockaddr_in
#include <stdlib.h>      // atoi()
#include <errno.h>      

#include "../log/log.h"
#include "../pool/sqlconnRAII.h"
#include "../buffer/buffer.h"
#include "httprequest.h"
#include "httpresponse.h"

class HttpConn {
public:
    HttpConn();

    ~HttpConn();

    void init(int sockFd, const sockaddr_in& addr);

    ssize_t read(int* saveErrno);

    ssize_t write(int* saveErrno);

    void Close();

    int GetFd() const;

    int GetPort() const;

    const char* GetIP() const;
    
    sockaddr_in GetAddr() const;
    
    bool process();

    int ToWriteBytes() { 
        return iov_[0].iov_len + iov_[1].iov_len; 
    }

    bool IsKeepAlive() const {
        return request_.IsKeepAlive();
    }

    static bool isET;
    static const char* srcDir;        // 资源目录
    static std::atomic<int> userCount;
    
private:
   
    int fd_;
    struct  sockaddr_in addr_;

    bool isClose_;
    
    int iovCnt_;
    struct iovec iov_[2]; 
    
    Buffer readBuff_; // 读缓冲区
    Buffer writeBuff_; // 写缓冲区

    HttpRequest request_; // 请求
    HttpResponse response_; // 响应
};


#endif //HTTP_CONN_H

httpconn.cpp


#include "httpconn.h"
using namespace std;

const char* HttpConn::srcDir;
std::atomic<int> HttpConn::userCount;
bool HttpConn::isET;

HttpConn::HttpConn() { 
    fd_ = -1;
    addr_ = { 0 };
    isClose_ = true;
};

HttpConn::~HttpConn() { 
    Close(); 
};

void HttpConn::init(int fd, const sockaddr_in& addr) {         // 使用socket连接文件描述符 和 socket 地址初始化 http 连接
    assert(fd > 0);
    userCount++;
    addr_ = addr;
    fd_ = fd;
    writeBuff_.RetrieveAll();
    readBuff_.RetrieveAll();
    isClose_ = false;
    LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}

void HttpConn::Close() {
    response_.UnmapFile();
    if(isClose_ == false){
        isClose_ = true; 
        userCount--;
        close(fd_);
        LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
    }
}

int HttpConn::GetFd() const {
    return fd_;
};

struct sockaddr_in HttpConn::GetAddr() const {
    return addr_;
}

const char* HttpConn::GetIP() const {
    return inet_ntoa(addr_.sin_addr);
}

int HttpConn::GetPort() const {
    return addr_.sin_port;
}

ssize_t HttpConn::read(int* saveErrno) {
    ssize_t len = -1;
    do {
        len = readBuff_.ReadFd(fd_, saveErrno);                // 从连接中读取数据到读缓冲区
        if (len <= 0) {
            break;
        }
    } while (isET);
    return len;
}

ssize_t HttpConn::write(int* saveErrno) {
    ssize_t len = -1;
    do {
        len = writev(fd_, iov_, iovCnt_);
        if(len <= 0) {
            *saveErrno = errno;
            break;
        }
        if(iov_[0].iov_len + iov_[1].iov_len  == 0) { break; } /* 传输结束 */
        else if(static_cast<size_t>(len) > iov_[0].iov_len) {                                // 如果第一块传完了 第二块传了一部分
            iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len);        // 更新第二块的 iov_base
            iov_[1].iov_len -= (len - iov_[0].iov_len);                                      // 更新第二块的 iov_len
            if(iov_[0].iov_len) {                  // 如果第一块长度不是 0
                writeBuff_.RetrieveAll();          // 重置写缓冲区
                iov_[0].iov_len = 0;               // 第一块长度置零
            }
        }
        else {                                                                           // 第一块还没有传完
            iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len;                         // 更新第一块的起始地址
            iov_[0].iov_len -= len;                                                      // 更新第一块的长度
            writeBuff_.Retrieve(len);                                                    // 写缓冲区删除 len 字节
        }
    } while(isET || ToWriteBytes() > 10240);
    return len;
}

bool HttpConn::process() {
    request_.Init();
    if(readBuff_.ReadableBytes() <= 0) {
        return false;
    }
    else if(request_.parse(readBuff_)) {
        LOG_DEBUG("%s", request_.path().c_str());
        response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);
    } else {
        response_.Init(srcDir, request_.path(), false, 400);
    }

    response_.MakeResponse(writeBuff_);
    /* 响应头 */
    iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());
    iov_[0].iov_len = writeBuff_.ReadableBytes();
    iovCnt_ = 1;

    /* 文件 */
    if(response_.FileLen() > 0  && response_.File()) {
        iov_[1].iov_base = response_.File();
        iov_[1].iov_len = response_.FileLen();
        iovCnt_ = 2;
    }
    LOG_DEBUG("filesize:%d, %d  to %d", response_.FileLen() , iovCnt_, ToWriteBytes());
    return true;
}

Server

epoller.h


#ifndef EPOLLER_H
#define EPOLLER_H

#include <sys/epoll.h> //epoll_ctl()
#include <fcntl.h>  // fcntl()
#include <unistd.h> // close()
#include <assert.h> // close()
#include <vector>
#include <errno.h>

class Epoller {
public:
    explicit Epoller(int maxEvent = 1024);

    ~Epoller();

    bool AddFd(int fd, uint32_t events);

    bool ModFd(int fd, uint32_t events);

    bool DelFd(int fd);

    int Wait(int timeoutMs = -1);

    int GetEventFd(size_t i) const;

    uint32_t GetEvents(size_t i) const;
        
private:
    int epollFd_;

    std::vector<struct epoll_event> events_;    
};

#endif //EPOLLER_H

epoller.cpp

#include "epoller.h"

Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent){
    assert(epollFd_ >= 0 && events_.size() > 0);
}

Epoller::~Epoller() {
    close(epollFd_);
}

bool Epoller::AddFd(int fd, uint32_t events) {
    if(fd < 0) return false;
    epoll_event ev = {0};
    ev.data.fd = fd;
    ev.events = events;
    return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
}

bool Epoller::ModFd(int fd, uint32_t events) {
    if(fd < 0) return false;
    epoll_event ev = {0};
    ev.data.fd = fd;
    ev.events = events;
    return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);
}

bool Epoller::DelFd(int fd) {
    if(fd < 0) return false;
    epoll_event ev = {0};
    return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev);
}

int Epoller::Wait(int timeoutMs) {
    return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}

int Epoller::GetEventFd(size_t i) const {
    assert(i < events_.size() && i >= 0);
    return events_[i].data.fd;
}

uint32_t Epoller::GetEvents(size_t i) const {
    assert(i < events_.size() && i >= 0);
    return events_[i].events;
}

webserver.h

#ifndef WEBSERVER_H
#define WEBSERVER_H

#include <unordered_map>
#include <fcntl.h>       // fcntl()
#include <unistd.h>      // close()
#include <assert.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "epoller.h"
#include "../log/log.h"
#include "../timer/heaptimer.h"
#include "../pool/sqlconnpool.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnRAII.h"
#include "../http/httpconn.h"

class WebServer {
public:
    WebServer(
        int port, int trigMode, int timeoutMS, bool OptLinger, 
        int sqlPort, const char* sqlUser, const  char* sqlPwd, 
        const char* dbName, int connPoolNum, int threadNum,
        bool openLog, int logLevel, int logQueSize);

    ~WebServer();
    void Start();

private:
    bool InitSocket_(); 
    void InitEventMode_(int trigMode);
    void AddClient_(int fd, sockaddr_in addr);
  
    void DealListen_();
    void DealWrite_(HttpConn* client);
    void DealRead_(HttpConn* client);

    void SendError_(int fd, const char*info);
    void ExtentTime_(HttpConn* client);
    void CloseConn_(HttpConn* client);

    void OnRead_(HttpConn* client);
    void OnWrite_(HttpConn* client);
    void OnProcess(HttpConn* client);

    static const int MAX_FD = 65536;

    static int SetFdNonblock(int fd);

    int port_;
    bool openLinger_;
    int timeoutMS_;  /* 毫秒MS */
    bool isClose_;
    int listenFd_;
    char* srcDir_;
    
    uint32_t listenEvent_;
    uint32_t connEvent_;
   
    std::unique_ptr<HeapTimer> timer_;
    std::unique_ptr<ThreadPool> threadpool_;
    std::unique_ptr<Epoller> epoller_;
    std::unordered_map<int, HttpConn> users_;
};


#endif //WEBSERVER_H

webserver.cpp

#include "webserver.h"

using namespace std;

WebServer::WebServer(
            int port, int trigMode, int timeoutMS, bool OptLinger,
            int sqlPort, const char* sqlUser, const  char* sqlPwd,
            const char* dbName, int connPoolNum, int threadNum,
            bool openLog, int logLevel, int logQueSize):
            port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false),
            timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller())
    {
    srcDir_ = getcwd(nullptr, 256);               // 获取当前工作目录
    assert(srcDir_);
    strncat(srcDir_, "/resources/", 16);          // 字符串拼接
    HttpConn::userCount = 0;                      
    HttpConn::srcDir = srcDir_;
    SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);       // 数据库连接初始化

    InitEventMode_(trigMode);                         // 初始化 epoll 模式
    if(!InitSocket_()) { isClose_ = true;}            // 初始化 socket 建立监听连接

    if(openLog) {
        Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
        if(isClose_) { LOG_ERROR("========== Server init error!=========="); }
        else {
            LOG_INFO("========== Server init ==========");
            LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false");
            LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
                            (listenEvent_ & EPOLLET ? "ET": "LT"),
                            (connEvent_ & EPOLLET ? "ET": "LT"));
            LOG_INFO("LogSys level: %d", logLevel);
            LOG_INFO("srcDir: %s", HttpConn::srcDir);
            LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum);
        }
    }
}

WebServer::~WebServer() {
    close(listenFd_);
    isClose_ = true;
    free(srcDir_);
    SqlConnPool::Instance()->ClosePool();
}

void WebServer::InitEventMode_(int trigMode) {
    listenEvent_ = EPOLLRDHUP;                  // 读事件
    connEvent_ = EPOLLONESHOT | EPOLLRDHUP;      //  EPOLLONESHOT oneshot 只触发一次
    switch (trigMode)                          // 选择模式
    {
    case 0:
        break;
    case 1:
        connEvent_ |= EPOLLET;
        break;
    case 2:
        listenEvent_ |= EPOLLET;
        break;
    case 3:
        listenEvent_ |= EPOLLET;
        connEvent_ |= EPOLLET;
        break;
    default:
        listenEvent_ |= EPOLLET;
        connEvent_ |= EPOLLET;
        break;
    }
    HttpConn::isET = (connEvent_ & EPOLLET);
}

void WebServer::Start() {
    int timeMS = -1;  /* epoll wait timeout == -1 无事件将阻塞 */
    if(!isClose_) { LOG_INFO("========== Server start =========="); }
    while(!isClose_) {
        if(timeoutMS_ > 0) {
            timeMS = timer_->GetNextTick();         // 函数内部调用了一次 tick 清除超时节点
        }
        int eventCnt = epoller_->Wait(timeMS);
        for(int i = 0; i < eventCnt; i++) {
            /* 处理事件 */
            int fd = epoller_->GetEventFd(i);
            uint32_t events = epoller_->GetEvents(i);
            if(fd == listenFd_) {
                DealListen_();                   // 监听文件描述符的事件   有新的连接  建立新连接
            }
            else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {      // 这些错误事件关闭连接
                assert(users_.count(fd) > 0);
                CloseConn_(&users_[fd]);
            }
            else if(events & EPOLLIN) {                // 可读事件
                assert(users_.count(fd) > 0);
                DealRead_(&users_[fd]);                // 处理读
            }
            else if(events & EPOLLOUT) {               // 写事件
                assert(users_.count(fd) > 0);
                DealWrite_(&users_[fd]);               // 处理写
            } else {
                LOG_ERROR("Unexpected event");
            }
        }
    }
}

void WebServer::SendError_(int fd, const char*info) {
    assert(fd > 0);
    int ret = send(fd, info, strlen(info), 0);
    if(ret < 0) {
        LOG_WARN("send error to client[%d] error!", fd);
    }
    close(fd);
}

void WebServer::CloseConn_(HttpConn* client) {            // 关闭 http 连接
    assert(client);
    LOG_INFO("Client[%d] quit!", client->GetFd());
    epoller_->DelFd(client->GetFd());
    client->Close();
}

void WebServer::AddClient_(int fd, sockaddr_in addr) {
    assert(fd > 0);
    users_[fd].init(fd, addr);
    if(timeoutMS_ > 0) {
        timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));   //加入定时器 利用 bind 加 functional 实现回调函数  关闭连接
    }
    epoller_->AddFd(fd, EPOLLIN | connEvent_);  // 在 epoll 中注册 客户端连接 读事件
    SetFdNonblock(fd);                          // 设置文件描述符 非阻塞
    LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}

void WebServer::DealListen_() {                        // 处理监听 socket 上的事件
    struct sockaddr_in addr;
    socklen_t len = sizeof(addr);
    do {
        int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
        if(fd <= 0) { return;}
        else if(HttpConn::userCount >= MAX_FD) {
            SendError_(fd, "Server busy!");
            LOG_WARN("Clients is full!");
            return;
        }
        AddClient_(fd, addr);
    } while(listenEvent_ & EPOLLET);  // ET 模式要循环  
}

void WebServer::DealRead_(HttpConn* client) {
    assert(client);
    ExtentTime_(client);
    threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));  // 在线程池中加入读任务
}

void WebServer::DealWrite_(HttpConn* client) {
    assert(client);
    ExtentTime_(client);
    threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));  // 线程池中加入写任务
}

void WebServer::ExtentTime_(HttpConn* client) {
    assert(client);
    if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); }  // 调整定时器  
}

void WebServer::OnRead_(HttpConn* client) {       // 读取数据
    assert(client);
    int ret = -1;
    int readErrno = 0;
    ret = client->read(&readErrno);
    if(ret <= 0 && readErrno != EAGAIN) {
        CloseConn_(client);
        return;
    }
    OnProcess(client);
}

void WebServer::OnProcess(HttpConn* client) {
    if(client->process()) {
        epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);       // 处理成功 在 epoll 中加入写事件
    } else {
        epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);        // 读缓冲区 没有内容 在 epoll 中注册读事件
    }
}

void WebServer::OnWrite_(HttpConn* client) {
    assert(client);
    int ret = -1;
    int writeErrno = 0;
    ret = client->write(&writeErrno);
    if(client->ToWriteBytes() == 0) {
        /* 传输完成 */
        if(client->IsKeepAlive()) {
            OnProcess(client);
            return;
        }
    }
    else if(ret < 0) {
        if(writeErrno == EAGAIN) {
            /* 继续传输 */
            epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
            return;
        }
    }
    CloseConn_(client);
}

/* Create listenFd */
bool WebServer::InitSocket_() {
    int ret;
    struct sockaddr_in addr;
    if(port_ > 65535 || port_ < 80) {
        LOG_ERROR("Port:%d error!",  port_);
        return false;
    }
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl(INADDR_ANY);  // 主机字节序 转 网络字节序
    addr.sin_port = htons(port_);              // 主机字节序 转 网络字节序
    struct linger optLinger = { 0 };           // 优雅关闭连接结构体
    if(openLinger_) {
        /* 优雅关闭: 直到所剩数据发送完毕或超时 */
        optLinger.l_onoff = 1;
        optLinger.l_linger = 1;
    }

    listenFd_ = socket(AF_INET, SOCK_STREAM, 0);           // 创建 sokcet 连接
    if(listenFd_ < 0) {
        LOG_ERROR("Create socket error!", port_);
        return false;
    }

    ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));  // 设置优雅关闭连接选项
    if(ret < 0) {
        close(listenFd_);
        LOG_ERROR("Init linger error!", port_);
        return false;
    }

    int optval = 1;
    /* 端口复用 */
    /* 只有最后一个套接字会正常接收数据。 */
    ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int));  // 设置端口复用选项
    if(ret == -1) {
        LOG_ERROR("set socket setsockopt error !");
        close(listenFd_);
        return false;
    }

    ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr));
    if(ret < 0) {
        LOG_ERROR("Bind Port:%d error!", port_);
        close(listenFd_);
        return false;
    }

    ret = listen(listenFd_, 6);
    if(ret < 0) {
        LOG_ERROR("Listen port:%d error!", port_);
        close(listenFd_);
        return false;
    }
    ret = epoller_->AddFd(listenFd_,  listenEvent_ | EPOLLIN); // 监听连接
    if(ret == 0) {
        LOG_ERROR("Add listen error!");
        close(listenFd_);
        return false;
    }
    SetFdNonblock(listenFd_);                   // 设置文件描述符 非阻塞
    LOG_INFO("Server port:%d", port_);
    return true;
}

int WebServer::SetFdNonblock(int fd) {
    assert(fd > 0);
    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
}