注意:这篇文章上次更新于1238天前,文章内容可能已经过时。
WebServer
缓冲区 Buffer
buffer.h
#ifndef BUFFER_H #define BUFFER_H #include <cstring> //perror #include <iostream> #include <unistd.h> // write #include <sys/uio.h> //readv #include <vector> //readv #include <atomic> // 原子类型 #include <assert.h> class Buffer { public: Buffer(int initBuffSize = 1024); ~Buffer() = default; size_t WritableBytes() const; // 可写字节数 :总长度-写位置 size_t ReadableBytes() const ; // 可读字节数 :写位置-读位置 size_t PrependableBytes() const; // 预置字节数 :读位置 const char* Peek() const; // 第一个数据字节 void EnsureWriteable(size_t len); // 确认可写长度为 len void HasWritten(size_t len); // 已经写入了 len 个字节 调用 HasWritten 函数更新写位置 void Retrieve(size_t len); // 将读位置前移 len void RetrieveUntil(const char* end);// 将读位置前移至 const char* end void RetrieveAll() ; // 缓冲区重置 std::string RetrieveAllToStr(); // 返回现有缓冲区内容,并将缓冲区重置 const char* BeginWriteConst() const;// 返回第一个可写位置 char* BeginWrite(); // 返回第一个可写位置 void Append(const std::string& str); void Append(const char* str, size_t len); // 核心函数 另外三个都是调用的这个 将 str 起始位置长度为 len 的字符串写入到缓冲区 void Append(const void* data, size_t len); void Append(const Buffer& buff); ssize_t ReadFd(int fd, int* Errno); // 从文件描述符读取数据到缓冲区 ssize_t WriteFd(int fd, int* Errno); // 将缓冲区数据写入到文件描述符 private: char* BeginPtr_(); // 缓冲区首地址 const char* BeginPtr_() const; // 缓冲区首地址 void MakeSpace_(size_t len); // 获得空间 len std::vector<char> buffer_; // 缓冲区 大小为 initBuffSize std::atomic<std::size_t> readPos_; // 缓冲区读地址(下标)(偏移量) 设置为原子类型,保证线程安全 std::atomic<std::size_t> writePos_; // 缓冲区写地址(下标)(偏移量) }; #endif //BUFFER_H |
buffer.cpp
#include "buffer.h" Buffer::Buffer(int initBuffSize) : buffer_(initBuffSize), readPos_(0), writePos_(0) {} size_t Buffer::ReadableBytes() const { return writePos_ - readPos_; } size_t Buffer::WritableBytes() const { return buffer_.size() - writePos_; } size_t Buffer::PrependableBytes() const { return readPos_; } const char* Buffer::Peek() const { return BeginPtr_() + readPos_; } void Buffer::Retrieve(size_t len) { assert(len <= ReadableBytes()); readPos_ += len; } void Buffer::RetrieveUntil(const char* end) { assert(Peek() <= end ); Retrieve(end - Peek()); } void Buffer::RetrieveAll() { bzero(&buffer_[0], buffer_.size()); readPos_ = 0; writePos_ = 0; } std::string Buffer::RetrieveAllToStr() { std::string str(Peek(), ReadableBytes()); // string s(cstr,char_len) : 将缓冲区的内容构造出字符串 str 返回 RetrieveAll(); // 再将缓冲区重置 return str; } const char* Buffer::BeginWriteConst() const { return BeginPtr_() + writePos_; } char* Buffer::BeginWrite() { return BeginPtr_() + writePos_; } void Buffer::HasWritten(size_t len) { writePos_ += len; } void Buffer::Append(const std::string& str) { Append(str.data(), str.length()); // data 方法返回首地址 length 方法返回长度 } void Buffer::Append(const void* data, size_t len) { assert(data); Append(static_cast<const char*>(data), len); // void* 类型要静态转换为 const char* } void Buffer::Append(const char* str, size_t len) { assert(str); EnsureWriteable(len); // 确认可写长度 len std::copy(str, str + len, BeginWrite()); // 调用copy 进行写入 HasWritten(len); // 调用 HasWritten 更新写位置 } void Buffer::Append(const Buffer& buff) { Append(buff.Peek(), buff.ReadableBytes()); // 将另一个缓冲区的内容写入到当前缓冲区 } void Buffer::EnsureWriteable(size_t len) { if(WritableBytes() < len) { // 如果可写长度(写位置到缓冲区结束)小于 len MakeSpace_(len); // 获得空间 len } assert(WritableBytes() >= len); } ssize_t Buffer::ReadFd(int fd, int* saveErrno) { char buff[65535]; struct iovec iov[2]; const size_t writable = WritableBytes(); /* 分散读, 保证数据全部读完 */ iov[0].iov_base = BeginPtr_() + writePos_; iov[0].iov_len = writable; iov[1].iov_base = buff; iov[1].iov_len = sizeof(buff); const ssize_t len = readv(fd, iov, 2); if(len < 0) { // 读错误 *saveErrno = errno; } else if(static_cast<size_t>(len) <= writable) { // 读取字节数小于缓冲区剩余可写字节数 writePos_ += len; } else { // buff 中有数据 writePos_ = buffer_.size(); // 已经写满,写偏移到达结尾位置 Append(buff, len - writable); // 将 buff 中的数据写入缓冲区 } return len; // 返回读取字节长度 } ssize_t Buffer::WriteFd(int fd, int* saveErrno) { size_t readSize = ReadableBytes(); // 缓冲区中的字节数 ssize_t len = write(fd, Peek(), readSize); // 将缓冲区中的数据写入文件描述符 if(len < 0) { *saveErrno = errno; return len; } readPos_ += len; // 更新可读位置 return len; } char* Buffer::BeginPtr_() { // 缓冲区首地址 return &*buffer_.begin(); } const char* Buffer::BeginPtr_() const { // 缓冲区首地址 return &*buffer_.begin(); } void Buffer::MakeSpace_(size_t len) { if(WritableBytes() + PrependableBytes() < len) { // 缓冲区剩余长度小于 len的话 buffer_.resize(writePos_ + len + 1); // 扩容缓冲区 } else { // 否则调整缓冲区数据位置 使空位置连续 size_t readable = ReadableBytes(); std::copy(BeginPtr_() + readPos_, BeginPtr_() + writePos_, BeginPtr_()); // 把一个序列拷贝到一个容器中去 就是把缓冲区原有数据移动到vector的起始地址处 readPos_ = 0; // 更新读位置 writePos_ = readPos_ + readable; // 更新写位置 assert(readable == ReadableBytes()); } } |
日志系统
log.h
#ifndef LOG_H #define LOG_H #include <mutex> #include <string> #include <thread> #include <sys/time.h> #include <string.h> #include <stdarg.h> // vastart va_end #include <assert.h> #include <sys/stat.h> //mkdir #include "blockqueue.h" #include "../buffer/buffer.h" class Log { public: void init(int level, const char* path = "./log", const char* suffix =".log", int maxQueueCapacity = 1024); static Log* Instance(); static void FlushLogThread(); void write(int level, const char *format,...); void flush(); int GetLevel(); void SetLevel(int level); bool IsOpen() { return isOpen_; } private: Log(); // 私有化构造函数 void AppendLogLevelTitle_(int level); virtual ~Log(); // 虚析构 void AsyncWrite_(); // 异步写 private: static const int LOG_PATH_LEN = 256; // 日志路径长度 static const int LOG_NAME_LEN = 256; // 日志名称长度 static const int MAX_LINES = 50000; // 最大行数 const char* path_; // 路径 const char* suffix_; // 后缀 int MAX_LINES_; // 最大行 int lineCount_; // 行数 int toDay_; // 今天 bool isOpen_; // 打开标志 Buffer buff_; // 缓冲区 int level_; // 日志级别 bool isAsync_; // 异步标志 FILE* fp_; // 文件指针 std::unique_ptr<BlockDeque<std::string>> deque_; // 阻塞队列 指针 std::unique_ptr<std::thread> writeThread_; // 写线程 指针 std::mutex mtx_; // 互斥锁 }; #define LOG_BASE(level, format, ...) \ do {\ Log* log = Log::Instance();\ if (log->IsOpen() && log->GetLevel() <= level) {\ log->write(level, format, ##__VA_ARGS__); \ log->flush();\ }\ } while(0); #define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0); // ##__VA_ARGS__ 可变参数宏 #define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0); #define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0); #define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0); #endif //LOG_H |
log.cpp
#include "log.h" using namespace std; Log::Log() { lineCount_ = 0; isAsync_ = false; writeThread_ = nullptr; deque_ = nullptr; toDay_ = 0; fp_ = nullptr; } Log::~Log() { if(writeThread_ && writeThread_->joinable()) { while(!deque_->empty()) { deque_->flush(); }; deque_->Close(); writeThread_->join(); } if(fp_) { lock_guard<mutex> locker(mtx_); flush(); fclose(fp_); } } int Log::GetLevel() { lock_guard<mutex> locker(mtx_); return level_; } void Log::SetLevel(int level) { lock_guard<mutex> locker(mtx_); level_ = level; } void Log::init(int level = 1, const char* path, const char* suffix, int maxQueueSize) { isOpen_ = true; level_ = level; if(maxQueueSize > 0) { isAsync_ = true; if(!deque_) { unique_ptr<BlockDeque<std::string>> newDeque(new BlockDeque<std::string>); deque_ = move(newDeque); std::unique_ptr<std::thread> NewThread(new thread(FlushLogThread)); writeThread_ = move(NewThread); } } else { isAsync_ = false; } lineCount_ = 0; time_t timer = time(nullptr); struct tm *sysTime = localtime(&timer); struct tm t = *sysTime; path_ = path; suffix_ = suffix; char fileName[LOG_NAME_LEN] = {0}; snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s", path_, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_); toDay_ = t.tm_mday; { lock_guard<mutex> locker(mtx_); buff_.RetrieveAll(); if(fp_) { flush(); fclose(fp_); } fp_ = fopen(fileName, "a"); if(fp_ == nullptr) { mkdir(path_, 0777); fp_ = fopen(fileName, "a"); } assert(fp_ != nullptr); } } void Log::write(int level, const char *format, ...) { struct timeval now = {0, 0}; // 秒 和 微妙 gettimeofday(&now, nullptr); time_t tSec = now.tv_sec; struct tm *sysTime = localtime(&tSec); struct tm t = *sysTime; va_list vaList; /* 日志日期 日志行数 */ if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0))) { unique_lock<mutex> locker(mtx_); locker.unlock(); char newFile[LOG_NAME_LEN]; char tail[36] = {0}; snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday); if (toDay_ != t.tm_mday) { snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_); toDay_ = t.tm_mday; lineCount_ = 0; } else { snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_ / MAX_LINES), suffix_); } locker.lock(); flush(); fclose(fp_); fp_ = fopen(newFile, "a"); assert(fp_ != nullptr); } { unique_lock<mutex> locker(mtx_); lineCount_++; int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec); buff_.HasWritten(n); AppendLogLevelTitle_(level); va_start(vaList, format); int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList); va_end(vaList); buff_.HasWritten(m); buff_.Append("\n\0", 2); if(isAsync_ && deque_ && !deque_->full()) { deque_->push_back(buff_.RetrieveAllToStr()); } else { fputs(buff_.Peek(), fp_); } buff_.RetrieveAll(); } } void Log::AppendLogLevelTitle_(int level) { switch(level) { case 0: buff_.Append("[debug]: ", 9); break; case 1: buff_.Append("[info] : ", 9); break; case 2: buff_.Append("[warn] : ", 9); break; case 3: buff_.Append("[error]: ", 9); break; default: buff_.Append("[info] : ", 9); break; } } void Log::flush() { if(isAsync_) { deque_->flush(); } fflush(fp_); } void Log::AsyncWrite_() { string str = ""; while(deque_->pop(str)) { lock_guard<mutex> locker(mtx_); fputs(str.c_str(), fp_); } } Log* Log::Instance() { static Log inst; return &inst; } void Log::FlushLogThread() { Log::Instance()->AsyncWrite_(); } |
blockqueue.h
#ifndef BLOCKQUEUE_H #define BLOCKQUEUE_H #include <mutex> // 互斥锁 #include <deque> #include <condition_variable> #include <sys/time.h> // 类模板 // 声明和实现写在一个文件里免得出错 template<class T> class BlockDeque { public: explicit BlockDeque(size_t MaxCapacity = 1000); // 显示构造函数 ~BlockDeque(); void clear(); bool empty(); bool full(); void Close(); size_t size(); size_t capacity(); T front(); T back(); void push_back(const T &item); void push_front(const T &item); bool pop(T &item); bool pop(T &item, int timeout); void flush(); private: std::deque<T> deq_; // T 类型的队列 size_t capacity_; // 容量 std::mutex mtx_; // 互斥锁 bool isClose_; // 是否关闭 std::condition_variable condConsumer_; // 条件变量 消费者 std::condition_variable condProducer_; // 条件变量 生产者 }; template<class T> BlockDeque<T>::BlockDeque(size_t MaxCapacity) :capacity_(MaxCapacity) { assert(MaxCapacity > 0); isClose_ = false; } template<class T> BlockDeque<T>::~BlockDeque() { Close(); }; template<class T> void BlockDeque<T>::Close() { { std::lock_guard<std::mutex> locker(mtx_); // 程序在 std::lock_guard 的对象 locker 的生命周期中加锁 符合 RAII 机制 deq_.clear(); // 清空队列 isClose_ = true; // 关闭标志为 true } condProducer_.notify_all(); // 唤醒所有线程 condConsumer_.notify_all(); }; template<class T> void BlockDeque<T>::flush() { condConsumer_.notify_one(); // 唤醒一个消费者线程 }; template<class T> void BlockDeque<T>::clear() { // 清空队列 std::lock_guard<std::mutex> locker(mtx_); deq_.clear(); } template<class T> T BlockDeque<T>::front() { // 返回队头元素 std::lock_guard<std::mutex> locker(mtx_); return deq_.front(); } template<class T> T BlockDeque<T>::back() { // 返回队尾元素 std::lock_guard<std::mutex> locker(mtx_); return deq_.back(); } template<class T> size_t BlockDeque<T>::size() { // 返回队列大小 std::lock_guard<std::mutex> locker(mtx_); return deq_.size(); } template<class T> size_t BlockDeque<T>::capacity() { // 返回队列容量 std::lock_guard<std::mutex> locker(mtx_); return capacity_; } template<class T> void BlockDeque<T>::push_back(const T &item) { // 尾插 std::unique_lock<std::mutex> locker(mtx_); // 这里是 unique_lock while(deq_.size() >= capacity_) { condProducer_.wait(locker); // push 的时候 容量不足时,生产者线程在这等,等待pop函数中将其唤醒 } deq_.push_back(item); // 容量够了 push 进去 condConsumer_.notify_one(); // 唤醒消费者线程 } template<class T> void BlockDeque<T>::push_front(const T &item) { // 与上面同理 std::unique_lock<std::mutex> locker(mtx_); while(deq_.size() >= capacity_) { condProducer_.wait(locker); } deq_.push_front(item); condConsumer_.notify_one(); } template<class T> bool BlockDeque<T>::empty() { // 判空 std::lock_guard<std::mutex> locker(mtx_); return deq_.empty(); } template<class T> bool BlockDeque<T>::full(){ // 判满 std::lock_guard<std::mutex> locker(mtx_); return deq_.size() >= capacity_; } template<class T> bool BlockDeque<T>::pop(T &item) { // 弹出队头元素 传出参数 item std::unique_lock<std::mutex> locker(mtx_); while(deq_.empty()){ condConsumer_.wait(locker); if(isClose_){ return false; } } item = deq_.front(); deq_.pop_front(); condProducer_.notify_one(); return true; } template<class T> bool BlockDeque<T>::pop(T &item, int timeout) { std::unique_lock<std::mutex> locker(mtx_); while(deq_.empty()){ if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout)) // 带等待时间的弹出 == std::cv_status::timeout){ return false; } if(isClose_){ return false; } } item = deq_.front(); deq_.pop_front(); condProducer_.notify_one(); return true; } #endif // BLOCKQUEUE_H |
池
threadpool.h
#ifndef THREADPOOL_H #define THREADPOOL_H #include <mutex> #include <condition_variable> #include <queue> #include <thread> #include <functional> class ThreadPool { public: explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) { assert(threadCount > 0); for(size_t i = 0; i < threadCount; i++) { // threadCount 个线程处理函数队列里的函数 std::thread([pool = pool_] { std::unique_lock<std::mutex> locker(pool->mtx); while(true) { if(!pool->tasks.empty()) { auto task = std::move(pool->tasks.front()); // 操作任务队列时 要上锁 pool->tasks.pop(); locker.unlock(); task(); // 执行任务时解锁 其他线程可以操作队列 locker.lock(); } else if(pool->isClosed) break; else pool->cond.wait(locker); // 没有任务了 等待 } }).detach(); // 使用 join 时 主线程阻塞 使用 detach(分离)主线程继续执行 } } ThreadPool() = default; ThreadPool(ThreadPool&&) = default; ~ThreadPool() { if(static_cast<bool>(pool_)) { { std::lock_guard<std::mutex> locker(pool_->mtx); pool_->isClosed = true; } pool_->cond.notify_all(); } } template<class F> void AddTask(F&& task) { { std::lock_guard<std::mutex> locker(pool_->mtx); pool_->tasks.emplace(std::forward<F>(task)); } pool_->cond.notify_one(); // 加入任务了 唤醒一个线程 } private: struct Pool { // Pool 结构体 std::mutex mtx; // 互斥锁 std::condition_variable cond; // 条件变量 bool isClosed; // 关闭标志 std::queue<std::function<void()>> tasks; // 函数队列 }; std::shared_ptr<Pool> pool_; // 指向 Pool 的只能指针 }; #endif //THREADPOOL_H |
sqlconnpool.h
#ifndef SQLCONNPOOL_H #define SQLCONNPOOL_H #include <mysql/mysql.h> #include <string> #include <queue> #include <mutex> #include <semaphore.h> // 信号量 #include <thread> #include "../log/log.h" class SqlConnPool { public: static SqlConnPool *Instance(); // 单例模式 MYSQL *GetConn(); // 获取 mysql 连接 void FreeConn(MYSQL * conn); // 释放 mysql 连接 int GetFreeConnCount(); // 获得空闲连接数量 void Init(const char* host, int port, // 初始化 const char* user,const char* pwd, const char* dbName, int connSize); void ClosePool(); // 关闭连接池 private: SqlConnPool(); ~SqlConnPool(); int MAX_CONN_; // 最大连接数量 int useCount_; // 用户数量 int freeCount_; // 空闲连接数量 std::queue<MYSQL *> connQue_; // mysql连接队列 std::mutex mtx_; // 互斥锁 sem_t semId_; // 信号量 }; #endif // SQLCONNPOOL_H |
sqlconnpool.cpp
#include "sqlconnpool.h" using namespace std; SqlConnPool::SqlConnPool() { // 构造函数 useCount_ = 0; freeCount_ = 0; } SqlConnPool* SqlConnPool::Instance() { static SqlConnPool connPool; // 静态对象 全局就这一个 return &connPool; } void SqlConnPool::Init(const char* host, int port, const char* user,const char* pwd, const char* dbName, int connSize = 10) { assert(connSize > 0); for (int i = 0; i < connSize; i++) { MYSQL *sql = nullptr; // 生命 MYSQL 指针 sql = mysql_init(sql); // 初始化 sql 连接 if (!sql) { LOG_ERROR("MySql init error!"); assert(sql); } sql = mysql_real_connect(sql, host, // 连接 mysql user, pwd, dbName, port, nullptr, 0); if (!sql) { LOG_ERROR("MySql Connect error!"); } connQue_.push(sql); // 加入到连接队列 } MAX_CONN_ = connSize; sem_init(&semId_, 0, MAX_CONN_); // 初始化信号量 第二个参数为 0 时 当前进程的所有线程共享 否则在进程间共享 MAX_CONN_为信号量初始值 可用资源数目 } MYSQL* SqlConnPool::GetConn() { MYSQL *sql = nullptr; if(connQue_.empty()){ LOG_WARN("SqlConnPool busy!"); return nullptr; } sem_wait(&semId_); // sem_wait 当信号量大于0时 使其减1 否则阻塞 { lock_guard<mutex> locker(mtx_); // 操作队列前要上锁 sql = connQue_.front(); // 从连接队列中取出连接 connQue_.pop(); } return sql; } void SqlConnPool::FreeConn(MYSQL* sql) { // 释放连接 assert(sql); lock_guard<mutex> locker(mtx_); connQue_.push(sql); // 连接用完了 放回队列里 sem_post(&semId_); // 信号量加一 也就是可用资源加一 } void SqlConnPool::ClosePool() { // 关闭连接池 lock_guard<mutex> locker(mtx_); while(!connQue_.empty()) { auto item = connQue_.front(); connQue_.pop(); mysql_close(item); // 逐一关闭连接 } mysql_library_end(); // mysql使用结束后要调用这个函数 进行一些内存清理 } int SqlConnPool::GetFreeConnCount() { // 可用连接数量 就是队列的长度 lock_guard<mutex> locker(mtx_); return connQue_.size(); } SqlConnPool::~SqlConnPool() { ClosePool(); } |
sqlconnRAII.h
#ifndef SQLCONNRAII_H #define SQLCONNRAII_H #include "sqlconnpool.h" /* 资源在对象构造初始化 资源在对象析构时释放*/ class SqlConnRAII { public: SqlConnRAII(MYSQL** sql, SqlConnPool *connpool) { // sql 是传出参数 assert(connpool); *sql = connpool->GetConn(); // SqlConnRAII 的构造函数中 获得连接 sql_ = *sql; connpool_ = connpool; } ~SqlConnRAII() { if(sql_) { connpool_->FreeConn(sql_); } // SqlConnRAII 的析构函数中 释放连接 } private: MYSQL *sql_; // mysql 指针 SqlConnPool* connpool_; // 连接池 指针 }; #endif //SQLCONNRAII_H |
定时器
heaptimer.h
#ifndef HEAP_TIMER_H #define HEAP_TIMER_H #include <queue> #include <unordered_map> #include <time.h> #include <algorithm> #include <arpa/inet.h> #include <functional> #include <assert.h> #include <chrono> // 时间工具 #include "../log/log.h" typedef std::function<void()> TimeoutCallBack; typedef std::chrono::high_resolution_clock Clock; // 时钟 typedef std::chrono::milliseconds MS; // 毫秒 typedef Clock::time_point TimeStamp; // 时间点 struct TimerNode { // 定时器节点 int id; TimeStamp expires; // 过期时间 TimeoutCallBack cb; bool operator<(const TimerNode& t) { return expires < t.expires; } }; class HeapTimer { // 时间堆类 public: HeapTimer() { heap_.reserve(64); } ~HeapTimer() { clear(); } void adjust(int id, int newExpires); void add(int id, int timeOut, const TimeoutCallBack& cb); void doWork(int id); void clear(); void tick(); void pop(); int GetNextTick(); private: void del_(size_t i); // 删除节点 void siftup_(size_t i); // 向上调整 bool siftdown_(size_t index, size_t n); // 向下调整 void SwapNode_(size_t i, size_t j); // 交换节点 std::vector<TimerNode> heap_; // 堆 std::unordered_map<int, size_t> ref_; // id-下标 映射表 }; #endif //HEAP_TIMER_H |
heaptimer.cpp
#include "heaptimer.h" void HeapTimer::siftup_(size_t i) { // 向上调整 assert(i >= 0 && i < heap_.size()); size_t j = (i - 1) / 2; // j 是 i 的父节点 while(j >= 0) { if(heap_[j] < heap_[i]) { break; } // 父节点比子节点小 不需要调整 SwapNode_(i, j); // 否则交换 i = j; j = (i - 1) / 2; // 继续向上调整 } } void HeapTimer::SwapNode_(size_t i, size_t j) { assert(i >= 0 && i < heap_.size()); assert(j >= 0 && j < heap_.size()); std::swap(heap_[i], heap_[j]); // 交换节点 ref_[heap_[i].id] = i; // 更新 映射关系 ref_[heap_[j].id] = j; } bool HeapTimer::siftdown_(size_t index, size_t n) { // 向下调整 assert(index >= 0 && index < heap_.size()); assert(n >= 0 && n <= heap_.size()); size_t i = index; size_t j = i * 2 + 1; // 左子节点 while(j < n) { if(j + 1 < n && heap_[j + 1] < heap_[j]) j++; // j + 1 是右子节点 如果j+1更小 那么j = j+1 if(heap_[i] < heap_[j]) break; // i 比 j 小 不需要调整 SwapNode_(i, j); // 否则交换 i j i = j; // 继续循环向下调整 j = i * 2 + 1; } return i > index; // 返回是否调整过 } void HeapTimer::add(int id, int timeout, const TimeoutCallBack& cb) { assert(id >= 0); size_t i; if(ref_.count(id) == 0) { /* 新节点:堆尾插入,调整堆 */ i = heap_.size(); ref_[id] = i; heap_.push_back({id, Clock::now() + MS(timeout), cb}); siftup_(i); } else { /* 已有结点:调整堆 */ i = ref_[id]; heap_[i].expires = Clock::now() + MS(timeout); heap_[i].cb = cb; if(!siftdown_(i, heap_.size())) { siftup_(i); } } } void HeapTimer::doWork(int id) { /* 删除指定id结点,并触发回调函数 */ if(heap_.empty() || ref_.count(id) == 0) { return; } size_t i = ref_[id]; TimerNode node = heap_[i]; node.cb(); del_(i); } void HeapTimer::del_(size_t index) { /* 删除指定位置的结点 */ assert(!heap_.empty() && index >= 0 && index < heap_.size()); /* 将要删除的结点换到队尾,然后调整堆 */ size_t i = index; size_t n = heap_.size() - 1; assert(i <= n); if(i < n) { SwapNode_(i, n); if(!siftdown_(i, n)) { siftup_(i); } } /* 队尾元素删除 */ ref_.erase(heap_.back().id); heap_.pop_back(); } void HeapTimer::adjust(int id, int timeout) { /* 调整指定id的结点 */ assert(!heap_.empty() && ref_.count(id) > 0); heap_[ref_[id]].expires = Clock::now() + MS(timeout);; siftdown_(ref_[id], heap_.size()); } void HeapTimer::tick() { /* 清除超时结点 */ if(heap_.empty()) { return; } while(!heap_.empty()) { TimerNode node = heap_.front(); if(std::chrono::duration_cast<MS>(node.expires - Clock::now()).count() > 0) { break; } node.cb(); pop(); } } void HeapTimer::pop() { assert(!heap_.empty()); del_(0); } void HeapTimer::clear() { ref_.clear(); heap_.clear(); } int HeapTimer::GetNextTick() { tick(); size_t res = -1; if(!heap_.empty()) { res = std::chrono::duration_cast<MS>(heap_.front().expires - Clock::now()).count(); if(res < 0) { res = 0; } } return res; } |
HTTP 连接
httprequest.h
#ifndef HTTP_REQUEST_H #define HTTP_REQUEST_H #include <unordered_map> #include <unordered_set> #include <string> #include <regex> // 正则匹配 #include <errno.h> #include <mysql/mysql.h> //mysql #include "../buffer/buffer.h" #include "../log/log.h" #include "../pool/sqlconnpool.h" #include "../pool/sqlconnRAII.h" class HttpRequest { public: enum PARSE_STATE { // 定义行解析状态 REQUEST_LINE, HEADERS, BODY, FINISH, }; enum HTTP_CODE { // 定义 HTTP 状态码 NO_REQUEST = 0, GET_REQUEST, BAD_REQUEST, NO_RESOURSE, FORBIDDENT_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION, }; HttpRequest() { Init(); } ~HttpRequest() = default; void Init(); bool parse(Buffer& buff); std::string path() const; std::string& path(); std::string method() const; std::string version() const; std::string GetPost(const std::string& key) const; std::string GetPost(const char* key) const; bool IsKeepAlive() const; /* todo void HttpConn::ParseFormData() {} void HttpConn::ParseJson() {} */ private: bool ParseRequestLine_(const std::string& line); void ParseHeader_(const std::string& line); void ParseBody_(const std::string& line); void ParsePath_(); void ParsePost_(); void ParseFromUrlencoded_(int tag); static bool UserVerify(const std::string& name, const std::string& pwd, bool isLogin); PARSE_STATE state_; std::string method_, path_, version_, body_; // 请求方法 请求路径 请求版本 请求体 std::unordered_map<std::string, std::string> header_; // 请求头 std::unordered_map<std::string, std::string> post_; static const std::unordered_set<std::string> DEFAULT_HTML; static const std::unordered_map<std::string, int> DEFAULT_HTML_TAG; static int ConverHex(char ch); }; #endif //HTTP_REQUEST_H |
httprequest.cpp
#include "httprequest.h" using namespace std; const unordered_set<string> HttpRequest::DEFAULT_HTML{ "/index", "/register", "/login", "/welcome", "/video", "/picture","/sendmail" }; // 默认 html const unordered_map<string, int> HttpRequest::DEFAULT_HTML_TAG { // html 标记 {"/register.html", 0}, {"/login.html", 1}, {"/sendmail.html", 2} }; void HttpRequest::Init() { method_ = path_ = version_ = body_ = ""; state_ = REQUEST_LINE; header_.clear(); post_.clear(); } bool HttpRequest::IsKeepAlive() const { if(header_.count("Connection") == 1) { return header_.find("Connection")->second == "keep-alive" && version_ == "1.1"; // 是否 keep-alive } return false; } bool HttpRequest::parse(Buffer& buff) { // 缓冲区解析 const char CRLF[] = "\r\n"; // 预定义 \r\n 数组 if(buff.ReadableBytes() <= 0) { // 缓冲区没有内容 返回失败 return false; } while(buff.ReadableBytes() && state_ != FINISH) { // 缓冲区有内容 并且 http 状态不是 FINISH const char* lineEnd = search(buff.Peek(), buff.BeginWriteConst(), CRLF, CRLF + 2); // search 参数代表两个左闭右开区间 std::string line(buff.Peek(), lineEnd); // 构造字符串 获得一行 switch(state_) // 有限状态机编程 { case REQUEST_LINE: if(!ParseRequestLine_(line)) { // 解析请求行 return false; } ParsePath_(); break; case HEADERS: ParseHeader_(line); // 解析头部 if(buff.ReadableBytes() <= 2) { state_ = FINISH; } break; case BODY: ParseBody_(line); // 解析 body break; default: break; } if(lineEnd == buff.BeginWrite()) { break; } // 缓冲区中无数据 buff.RetrieveUntil(lineEnd + 2); // 更新缓冲区的读位置 } LOG_DEBUG("[%s], [%s], [%s]", method_.c_str(), path_.c_str(), version_.c_str()); return true; // 解析完成 } void HttpRequest::ParsePath_() { // 解析请求路径 if(path_ == "/") { path_ = "/index.html"; } else { for(auto &item: DEFAULT_HTML) { if(item == path_) { path_ += ".html"; // 为请求路径加上html break; } } } } bool HttpRequest::ParseRequestLine_(const string& line) { // 解析请求行 regex patten("^([^ ]*) ([^ ]*) HTTP/([^ ]*)$"); // 正则匹配 用两个空格分隔 分别获得请求方法 请求路径 和 HTTP 版本 smatch subMatch; // 匹配结果 if(regex_match(line, subMatch, patten)) { method_ = subMatch[1]; path_ = subMatch[2]; version_ = subMatch[3]; state_ = HEADERS; // 更新状态为 获取头部 return true; } LOG_ERROR("RequestLine Error"); // 匹配失败 行错误 return false; } void HttpRequest::ParseHeader_(const string& line) { // 解析头部 regex patten("^([^:]*): ?(.*)$"); // 用 冒号空格分割的字符串 smatch subMatch; if(regex_match(line, subMatch, patten)) { header_[subMatch[1]] = subMatch[2]; } else { // 如果匹配失败 代表头部解析完成 state_ = BODY; // 更新状态 } } void HttpRequest::ParseBody_(const string& line) { // 解析请求体 body_ = line; // 请求体有内容是 post 请求 ParsePost_(); // 解析 post state_ = FINISH; // 解析请求结束 LOG_DEBUG("Body:%s, len:%d", line.c_str(), line.size()); } int HttpRequest::ConverHex(char ch) { if(ch >= 'A' && ch <= 'F') return ch -'A' + 10; if(ch >= 'a' && ch <= 'f') return ch -'a' + 10; return ch; } void HttpRequest::ParsePost_() { if(method_ == "POST" && header_["Content-Type"] == "application/x-www-form-urlencoded") { // 如果内容类型是 url编码 if(DEFAULT_HTML_TAG.count(path_)) { int tag = DEFAULT_HTML_TAG.find(path_)->second; LOG_DEBUG("Tag:%d", tag); if(tag == 0 || tag == 1) { ParseFromUrlencoded_(tag); // 从 url 中解析数据 bool isLogin = (tag == 1); if(UserVerify(post_["username"], post_["password"], isLogin)) { // 若果是登录 要进行用户验证 path_ = "/welcome.html"; } else { path_ = "/error.html"; } } else if(tag == 2){ ParseFromUrlencoded_(tag); string cmd = "python3 /root/MyWebServer/mail.py "; cmd += post_["mailto"]; cmd += " "; cmd += post_["subject"]; cmd += " "; cmd += post_["content"]; // cout << cmd << endl; if(system(cmd.c_str()) == 0){ path_ = "/report.html"; } else{ path_ = "/error.html"; } } } } } void HttpRequest::ParseFromUrlencoded_(int tag) { if(tag == 0 || tag == 1){ if(body_.size() == 0) { return; } string key, value; int num = 0; int n = body_.size(); int i = 0, j = 0; for(; i < n; i++) { char ch = body_[i]; switch (ch) { case '=': key = body_.substr(j, i - j); // 等号前的是 key j = i + 1; break; case '+': body_[i] = ' '; // url 编码中 空格被编码成加号 break; case '%': num = ConverHex(body_[i + 1]) * 16 + ConverHex(body_[i + 2]); // url 解码 从16进制转换成10进制 body_[i + 2] = num % 10 + '0'; body_[i + 1] = num / 10 + '0'; i += 2; break; case '&': value = body_.substr(j, i - j); // 等号和 & 之间的是 value j = i + 1; // j 是起始位置 更新 post_[key] = value; LOG_DEBUG("%s = %s", key.c_str(), value.c_str()); break; default: break; } } assert(j <= i); if(post_.count(key) == 0 && j < i) { // 插入最后一个 value value = body_.substr(j, i - j); post_[key] = value; } } else if(tag == 2){ if(body_.size() == 0){ return; } string key,value; int n = body_.size(); size_t i = 0,j = 0; while(body_.find('&',j) != -1){ i = body_.find('&',j); size_t k = body_.find('=',j); key = body_.substr(j,k-j); value = body_.substr(k+1,i-k-1); j = i + 1; post_[key] = value; } assert(j < n); i = body_.find('=',j); key = body_.substr(j,i-j); value = body_.substr(i+1,n-i-1); post_[key] = value; } } bool HttpRequest::UserVerify(const string &name, const string &pwd, bool isLogin) { if(name == "" || pwd == "") { return false; } LOG_INFO("Verify name:%s pwd:%s", name.c_str(), pwd.c_str()); MYSQL* sql; SqlConnRAII(&sql, SqlConnPool::Instance()); // 获得 sql 连接 assert(sql); bool flag = false; unsigned int j = 0; char order[256] = { 0 }; // 查询语句 MYSQL_FIELD *fields = nullptr; // 字段列数组 MYSQL_RES *res = nullptr; // 返回行的一个查询结果集 if(!isLogin) { flag = true; } /* 查询用户及密码 */ snprintf(order, 256, "SELECT username, password FROM user WHERE username='%s' LIMIT 1", name.c_str()); LOG_DEBUG("%s", order); if(mysql_query(sql, order)) { // 查询成功返回 0 mysql_free_result(res); return false; } res = mysql_store_result(sql); // 存储结果集 j = mysql_num_fields(res); // 字段数量 fields = mysql_fetch_fields(res); // 字段结构数组 while(MYSQL_ROW row = mysql_fetch_row(res)) { // 获得一行 LOG_DEBUG("MYSQL ROW: %s %s", row[0], row[1]); string password(row[1]); // row[0] 是用户名 row[1] 是密码 /* 登录行为 验证密码 */ if(isLogin) { if(pwd == password) { flag = true; } else { flag = false; LOG_DEBUG("pwd error!"); } } else { flag = false; LOG_DEBUG("user used!"); } } mysql_free_result(res); /* 注册行为 且 用户名未被使用*/ if(!isLogin && flag == true) { LOG_DEBUG("regirster!"); bzero(order, 256); snprintf(order, 256,"INSERT INTO user(username, password) VALUES('%s','%s')", name.c_str(), pwd.c_str()); LOG_DEBUG( "%s", order); if(mysql_query(sql, order)) { LOG_DEBUG( "Insert error!"); flag = false; } flag = true; } SqlConnPool::Instance()->FreeConn(sql); LOG_DEBUG( "UserVerify success!!"); return flag; } std::string HttpRequest::path() const{ return path_; } std::string& HttpRequest::path(){ return path_; } std::string HttpRequest::method() const { return method_; } std::string HttpRequest::version() const { return version_; } std::string HttpRequest::GetPost(const std::string& key) const { assert(key != ""); if(post_.count(key) == 1) { return post_.find(key)->second; } return ""; } std::string HttpRequest::GetPost(const char* key) const { assert(key != nullptr); if(post_.count(key) == 1) { return post_.find(key)->second; } return ""; } |
httpresponse.h
#ifndef HTTP_RESPONSE_H #define HTTP_RESPONSE_H #include <unordered_map> #include <fcntl.h> // open #include <unistd.h> // close #include <sys/stat.h> // stat #include <sys/mman.h> // mmap, munmap 共享内存 #include "../buffer/buffer.h" #include "../log/log.h" class HttpResponse { public: HttpResponse(); ~HttpResponse(); void Init(const std::string& srcDir, std::string& path, bool isKeepAlive = false, int code = -1); void MakeResponse(Buffer& buff); void UnmapFile(); char* File(); size_t FileLen() const; void ErrorContent(Buffer& buff, std::string message); int Code() const { return code_; } private: void AddStateLine_(Buffer &buff); void AddHeader_(Buffer &buff); void AddContent_(Buffer &buff); void ErrorHtml_(); std::string GetFileType_(); int code_; bool isKeepAlive_; std::string path_; std::string srcDir_; char* mmFile_; struct stat mmFileStat_; static const std::unordered_map<std::string, std::string> SUFFIX_TYPE; static const std::unordered_map<int, std::string> CODE_STATUS; static const std::unordered_map<int, std::string> CODE_PATH; }; #endif //HTTP_RESPONSE_H |
httpresponse.cpp
#include "httpresponse.h" using namespace std; const unordered_map<string, string> HttpResponse::SUFFIX_TYPE = { // 后缀类型 { ".html", "text/html" }, { ".xml", "text/xml" }, { ".xhtml", "application/xhtml+xml" }, { ".txt", "text/plain" }, { ".rtf", "application/rtf" }, { ".pdf", "application/pdf" }, { ".word", "application/nsword" }, { ".png", "image/png" }, { ".gif", "image/gif" }, { ".jpg", "image/jpeg" }, { ".jpeg", "image/jpeg" }, { ".au", "audio/basic" }, { ".mpeg", "video/mpeg" }, { ".mpg", "video/mpeg" }, { ".avi", "video/x-msvideo" }, { ".gz", "application/x-gzip" }, { ".tar", "application/x-tar" }, { ".css", "text/css "}, { ".js", "text/javascript "}, }; const unordered_map<int, string> HttpResponse::CODE_STATUS = { // 响应状态码 { 200, "OK" }, { 400, "Bad Request" }, { 403, "Forbidden" }, { 404, "Not Found" }, }; const unordered_map<int, string> HttpResponse::CODE_PATH = { // 状态码 和 文件映射 { 400, "/400.html" }, { 403, "/403.html" }, { 404, "/404.html" }, }; HttpResponse::HttpResponse() { // 构造函数 code_ = -1; path_ = srcDir_ = ""; isKeepAlive_ = false; mmFile_ = nullptr; mmFileStat_ = { 0 }; }; HttpResponse::~HttpResponse() { // 析构函数 UnmapFile(); } void HttpResponse::Init(const string& srcDir, string& path, bool isKeepAlive, int code){ // 初始化 assert(srcDir != ""); if(mmFile_) { UnmapFile(); } code_ = code; isKeepAlive_ = isKeepAlive; path_ = path; srcDir_ = srcDir; mmFile_ = nullptr; mmFileStat_ = { 0 }; // 文件信息结构体 stat } void HttpResponse::MakeResponse(Buffer& buff) { /* 判断请求的资源文件 */ if(stat((srcDir_ + path_).data(), &mmFileStat_) < 0 || S_ISDIR(mmFileStat_.st_mode)) { // stat 函数查看文件状态 第一个参数是文件名 第二个是传出参数 S_ISDIR 宏 检查文件是否是目录 code_ = 404; // 没有这个文件 或者是 目录 返回 404 } else if(!(mmFileStat_.st_mode & S_IROTH)) { // S_IROTH 其他组 读权限 code_ = 403; // 拒绝 } else if(code_ == -1) { code_ = 200; } ErrorHtml_(); // 生成错误页面 AddStateLine_(buff); // 向缓冲区中添加状态行 AddHeader_(buff); AddContent_(buff); } char* HttpResponse::File() { return mmFile_; } size_t HttpResponse::FileLen() const { // 文件大小 return mmFileStat_.st_size; } void HttpResponse::ErrorHtml_() { if(CODE_PATH.count(code_) == 1) { path_ = CODE_PATH.find(code_)->second; stat((srcDir_ + path_).data(), &mmFileStat_); } } void HttpResponse::AddStateLine_(Buffer& buff) { string status; if(CODE_STATUS.count(code_) == 1) { status = CODE_STATUS.find(code_)->second; } else { code_ = 400; status = CODE_STATUS.find(400)->second; } buff.Append("HTTP/1.1 " + to_string(code_) + " " + status + "\r\n"); // 向缓冲区中写入状态行 } void HttpResponse::AddHeader_(Buffer& buff) { // 添加响应头部信息 buff.Append("Connection: "); if(isKeepAlive_) { buff.Append("keep-alive\r\n"); buff.Append("keep-alive: max=6, timeout=120\r\n"); } else{ buff.Append("close\r\n"); } buff.Append("Content-type: " + GetFileType_() + "\r\n"); } void HttpResponse::AddContent_(Buffer& buff) { // 添加响应内容 int srcFd = open((srcDir_ + path_).data(), O_RDONLY); // 只读形式打开文件 if(srcFd < 0) { ErrorContent(buff, "File NotFound!"); return; } /* 将文件映射到内存提高文件的访问速度 MAP_PRIVATE 建立一个写入时拷贝的私有映射*/ LOG_DEBUG("file path %s", (srcDir_ + path_).data()); int* mmRet = (int*)mmap(0, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0); // 将文件映射到共享内存 if(*mmRet == -1) { ErrorContent(buff, "File NotFound!"); return; } mmFile_ = (char*)mmRet; close(srcFd); buff.Append("Content-length: " + to_string(mmFileStat_.st_size) + "\r\n\r\n"); } void HttpResponse::UnmapFile() { if(mmFile_) { munmap(mmFile_, mmFileStat_.st_size); // 关闭共享内存 mmFile_ = nullptr; } } string HttpResponse::GetFileType_() { /* 判断文件类型 */ string::size_type idx = path_.find_last_of('.'); if(idx == string::npos) { return "text/plain"; } string suffix = path_.substr(idx); if(SUFFIX_TYPE.count(suffix) == 1) { return SUFFIX_TYPE.find(suffix)->second; } return "text/plain"; } void HttpResponse::ErrorContent(Buffer& buff, string message) { string body; string status; body += "<html><title>Error</title>"; body += "<body bgcolor=\"ffffff\">"; if(CODE_STATUS.count(code_) == 1) { status = CODE_STATUS.find(code_)->second; } else { status = "Bad Request"; } body += to_string(code_) + " : " + status + "\n"; body += "<p>" + message + "</p>"; body += "<hr><em>TinyWebServer</em></body></html>"; buff.Append("Content-length: " + to_string(body.size()) + "\r\n\r\n"); buff.Append(body); } |
httpconn.h
#ifndef HTTP_CONN_H #define HTTP_CONN_H #include <sys/types.h> #include <sys/uio.h> // readv/writev #include <arpa/inet.h> // sockaddr_in #include <stdlib.h> // atoi() #include <errno.h> #include "../log/log.h" #include "../pool/sqlconnRAII.h" #include "../buffer/buffer.h" #include "httprequest.h" #include "httpresponse.h" class HttpConn { public: HttpConn(); ~HttpConn(); void init(int sockFd, const sockaddr_in& addr); ssize_t read(int* saveErrno); ssize_t write(int* saveErrno); void Close(); int GetFd() const; int GetPort() const; const char* GetIP() const; sockaddr_in GetAddr() const; bool process(); int ToWriteBytes() { return iov_[0].iov_len + iov_[1].iov_len; } bool IsKeepAlive() const { return request_.IsKeepAlive(); } static bool isET; static const char* srcDir; // 资源目录 static std::atomic<int> userCount; private: int fd_; struct sockaddr_in addr_; bool isClose_; int iovCnt_; struct iovec iov_[2]; Buffer readBuff_; // 读缓冲区 Buffer writeBuff_; // 写缓冲区 HttpRequest request_; // 请求 HttpResponse response_; // 响应 }; #endif //HTTP_CONN_H |
httpconn.cpp
#include "httpconn.h" using namespace std; const char* HttpConn::srcDir; std::atomic<int> HttpConn::userCount; bool HttpConn::isET; HttpConn::HttpConn() { fd_ = -1; addr_ = { 0 }; isClose_ = true; }; HttpConn::~HttpConn() { Close(); }; void HttpConn::init(int fd, const sockaddr_in& addr) { // 使用socket连接文件描述符 和 socket 地址初始化 http 连接 assert(fd > 0); userCount++; addr_ = addr; fd_ = fd; writeBuff_.RetrieveAll(); readBuff_.RetrieveAll(); isClose_ = false; LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount); } void HttpConn::Close() { response_.UnmapFile(); if(isClose_ == false){ isClose_ = true; userCount--; close(fd_); LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", fd_, GetIP(), GetPort(), (int)userCount); } } int HttpConn::GetFd() const { return fd_; }; struct sockaddr_in HttpConn::GetAddr() const { return addr_; } const char* HttpConn::GetIP() const { return inet_ntoa(addr_.sin_addr); } int HttpConn::GetPort() const { return addr_.sin_port; } ssize_t HttpConn::read(int* saveErrno) { ssize_t len = -1; do { len = readBuff_.ReadFd(fd_, saveErrno); // 从连接中读取数据到读缓冲区 if (len <= 0) { break; } } while (isET); return len; } ssize_t HttpConn::write(int* saveErrno) { ssize_t len = -1; do { len = writev(fd_, iov_, iovCnt_); if(len <= 0) { *saveErrno = errno; break; } if(iov_[0].iov_len + iov_[1].iov_len == 0) { break; } /* 传输结束 */ else if(static_cast<size_t>(len) > iov_[0].iov_len) { // 如果第一块传完了 第二块传了一部分 iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len); // 更新第二块的 iov_base iov_[1].iov_len -= (len - iov_[0].iov_len); // 更新第二块的 iov_len if(iov_[0].iov_len) { // 如果第一块长度不是 0 writeBuff_.RetrieveAll(); // 重置写缓冲区 iov_[0].iov_len = 0; // 第一块长度置零 } } else { // 第一块还没有传完 iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len; // 更新第一块的起始地址 iov_[0].iov_len -= len; // 更新第一块的长度 writeBuff_.Retrieve(len); // 写缓冲区删除 len 字节 } } while(isET || ToWriteBytes() > 10240); return len; } bool HttpConn::process() { request_.Init(); if(readBuff_.ReadableBytes() <= 0) { return false; } else if(request_.parse(readBuff_)) { LOG_DEBUG("%s", request_.path().c_str()); response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200); } else { response_.Init(srcDir, request_.path(), false, 400); } response_.MakeResponse(writeBuff_); /* 响应头 */ iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek()); iov_[0].iov_len = writeBuff_.ReadableBytes(); iovCnt_ = 1; /* 文件 */ if(response_.FileLen() > 0 && response_.File()) { iov_[1].iov_base = response_.File(); iov_[1].iov_len = response_.FileLen(); iovCnt_ = 2; } LOG_DEBUG("filesize:%d, %d to %d", response_.FileLen() , iovCnt_, ToWriteBytes()); return true; } |
Server
epoller.h
#ifndef EPOLLER_H #define EPOLLER_H #include <sys/epoll.h> //epoll_ctl() #include <fcntl.h> // fcntl() #include <unistd.h> // close() #include <assert.h> // close() #include <vector> #include <errno.h> class Epoller { public: explicit Epoller(int maxEvent = 1024); ~Epoller(); bool AddFd(int fd, uint32_t events); bool ModFd(int fd, uint32_t events); bool DelFd(int fd); int Wait(int timeoutMs = -1); int GetEventFd(size_t i) const; uint32_t GetEvents(size_t i) const; private: int epollFd_; std::vector<struct epoll_event> events_; }; #endif //EPOLLER_H |
epoller.cpp
#include "epoller.h" Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent){ assert(epollFd_ >= 0 && events_.size() > 0); } Epoller::~Epoller() { close(epollFd_); } bool Epoller::AddFd(int fd, uint32_t events) { if(fd < 0) return false; epoll_event ev = {0}; ev.data.fd = fd; ev.events = events; return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev); } bool Epoller::ModFd(int fd, uint32_t events) { if(fd < 0) return false; epoll_event ev = {0}; ev.data.fd = fd; ev.events = events; return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev); } bool Epoller::DelFd(int fd) { if(fd < 0) return false; epoll_event ev = {0}; return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev); } int Epoller::Wait(int timeoutMs) { return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs); } int Epoller::GetEventFd(size_t i) const { assert(i < events_.size() && i >= 0); return events_[i].data.fd; } uint32_t Epoller::GetEvents(size_t i) const { assert(i < events_.size() && i >= 0); return events_[i].events; } |
webserver.h
#ifndef WEBSERVER_H #define WEBSERVER_H #include <unordered_map> #include <fcntl.h> // fcntl() #include <unistd.h> // close() #include <assert.h> #include <errno.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include "epoller.h" #include "../log/log.h" #include "../timer/heaptimer.h" #include "../pool/sqlconnpool.h" #include "../pool/threadpool.h" #include "../pool/sqlconnRAII.h" #include "../http/httpconn.h" class WebServer { public: WebServer( int port, int trigMode, int timeoutMS, bool OptLinger, int sqlPort, const char* sqlUser, const char* sqlPwd, const char* dbName, int connPoolNum, int threadNum, bool openLog, int logLevel, int logQueSize); ~WebServer(); void Start(); private: bool InitSocket_(); void InitEventMode_(int trigMode); void AddClient_(int fd, sockaddr_in addr); void DealListen_(); void DealWrite_(HttpConn* client); void DealRead_(HttpConn* client); void SendError_(int fd, const char*info); void ExtentTime_(HttpConn* client); void CloseConn_(HttpConn* client); void OnRead_(HttpConn* client); void OnWrite_(HttpConn* client); void OnProcess(HttpConn* client); static const int MAX_FD = 65536; static int SetFdNonblock(int fd); int port_; bool openLinger_; int timeoutMS_; /* 毫秒MS */ bool isClose_; int listenFd_; char* srcDir_; uint32_t listenEvent_; uint32_t connEvent_; std::unique_ptr<HeapTimer> timer_; std::unique_ptr<ThreadPool> threadpool_; std::unique_ptr<Epoller> epoller_; std::unordered_map<int, HttpConn> users_; }; #endif //WEBSERVER_H |
webserver.cpp
#include "webserver.h" using namespace std; WebServer::WebServer( int port, int trigMode, int timeoutMS, bool OptLinger, int sqlPort, const char* sqlUser, const char* sqlPwd, const char* dbName, int connPoolNum, int threadNum, bool openLog, int logLevel, int logQueSize): port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false), timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller()) { srcDir_ = getcwd(nullptr, 256); // 获取当前工作目录 assert(srcDir_); strncat(srcDir_, "/resources/", 16); // 字符串拼接 HttpConn::userCount = 0; HttpConn::srcDir = srcDir_; SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum); // 数据库连接初始化 InitEventMode_(trigMode); // 初始化 epoll 模式 if(!InitSocket_()) { isClose_ = true;} // 初始化 socket 建立监听连接 if(openLog) { Log::Instance()->init(logLevel, "./log", ".log", logQueSize); if(isClose_) { LOG_ERROR("========== Server init error!=========="); } else { LOG_INFO("========== Server init =========="); LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false"); LOG_INFO("Listen Mode: %s, OpenConn Mode: %s", (listenEvent_ & EPOLLET ? "ET": "LT"), (connEvent_ & EPOLLET ? "ET": "LT")); LOG_INFO("LogSys level: %d", logLevel); LOG_INFO("srcDir: %s", HttpConn::srcDir); LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum); } } } WebServer::~WebServer() { close(listenFd_); isClose_ = true; free(srcDir_); SqlConnPool::Instance()->ClosePool(); } void WebServer::InitEventMode_(int trigMode) { listenEvent_ = EPOLLRDHUP; // 读事件 connEvent_ = EPOLLONESHOT | EPOLLRDHUP; // EPOLLONESHOT oneshot 只触发一次 switch (trigMode) // 选择模式 { case 0: break; case 1: connEvent_ |= EPOLLET; break; case 2: listenEvent_ |= EPOLLET; break; case 3: listenEvent_ |= EPOLLET; connEvent_ |= EPOLLET; break; default: listenEvent_ |= EPOLLET; connEvent_ |= EPOLLET; break; } HttpConn::isET = (connEvent_ & EPOLLET); } void WebServer::Start() { int timeMS = -1; /* epoll wait timeout == -1 无事件将阻塞 */ if(!isClose_) { LOG_INFO("========== Server start =========="); } while(!isClose_) { if(timeoutMS_ > 0) { timeMS = timer_->GetNextTick(); // 函数内部调用了一次 tick 清除超时节点 } int eventCnt = epoller_->Wait(timeMS); for(int i = 0; i < eventCnt; i++) { /* 处理事件 */ int fd = epoller_->GetEventFd(i); uint32_t events = epoller_->GetEvents(i); if(fd == listenFd_) { DealListen_(); // 监听文件描述符的事件 有新的连接 建立新连接 } else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { // 这些错误事件关闭连接 assert(users_.count(fd) > 0); CloseConn_(&users_[fd]); } else if(events & EPOLLIN) { // 可读事件 assert(users_.count(fd) > 0); DealRead_(&users_[fd]); // 处理读 } else if(events & EPOLLOUT) { // 写事件 assert(users_.count(fd) > 0); DealWrite_(&users_[fd]); // 处理写 } else { LOG_ERROR("Unexpected event"); } } } } void WebServer::SendError_(int fd, const char*info) { assert(fd > 0); int ret = send(fd, info, strlen(info), 0); if(ret < 0) { LOG_WARN("send error to client[%d] error!", fd); } close(fd); } void WebServer::CloseConn_(HttpConn* client) { // 关闭 http 连接 assert(client); LOG_INFO("Client[%d] quit!", client->GetFd()); epoller_->DelFd(client->GetFd()); client->Close(); } void WebServer::AddClient_(int fd, sockaddr_in addr) { assert(fd > 0); users_[fd].init(fd, addr); if(timeoutMS_ > 0) { timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd])); //加入定时器 利用 bind 加 functional 实现回调函数 关闭连接 } epoller_->AddFd(fd, EPOLLIN | connEvent_); // 在 epoll 中注册 客户端连接 读事件 SetFdNonblock(fd); // 设置文件描述符 非阻塞 LOG_INFO("Client[%d] in!", users_[fd].GetFd()); } void WebServer::DealListen_() { // 处理监听 socket 上的事件 struct sockaddr_in addr; socklen_t len = sizeof(addr); do { int fd = accept(listenFd_, (struct sockaddr *)&addr, &len); if(fd <= 0) { return;} else if(HttpConn::userCount >= MAX_FD) { SendError_(fd, "Server busy!"); LOG_WARN("Clients is full!"); return; } AddClient_(fd, addr); } while(listenEvent_ & EPOLLET); // ET 模式要循环 } void WebServer::DealRead_(HttpConn* client) { assert(client); ExtentTime_(client); threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client)); // 在线程池中加入读任务 } void WebServer::DealWrite_(HttpConn* client) { assert(client); ExtentTime_(client); threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client)); // 线程池中加入写任务 } void WebServer::ExtentTime_(HttpConn* client) { assert(client); if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); } // 调整定时器 } void WebServer::OnRead_(HttpConn* client) { // 读取数据 assert(client); int ret = -1; int readErrno = 0; ret = client->read(&readErrno); if(ret <= 0 && readErrno != EAGAIN) { CloseConn_(client); return; } OnProcess(client); } void WebServer::OnProcess(HttpConn* client) { if(client->process()) { epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT); // 处理成功 在 epoll 中加入写事件 } else { epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN); // 读缓冲区 没有内容 在 epoll 中注册读事件 } } void WebServer::OnWrite_(HttpConn* client) { assert(client); int ret = -1; int writeErrno = 0; ret = client->write(&writeErrno); if(client->ToWriteBytes() == 0) { /* 传输完成 */ if(client->IsKeepAlive()) { OnProcess(client); return; } } else if(ret < 0) { if(writeErrno == EAGAIN) { /* 继续传输 */ epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT); return; } } CloseConn_(client); } /* Create listenFd */ bool WebServer::InitSocket_() { int ret; struct sockaddr_in addr; if(port_ > 65535 || port_ < 80) { LOG_ERROR("Port:%d error!", port_); return false; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); // 主机字节序 转 网络字节序 addr.sin_port = htons(port_); // 主机字节序 转 网络字节序 struct linger optLinger = { 0 }; // 优雅关闭连接结构体 if(openLinger_) { /* 优雅关闭: 直到所剩数据发送完毕或超时 */ optLinger.l_onoff = 1; optLinger.l_linger = 1; } listenFd_ = socket(AF_INET, SOCK_STREAM, 0); // 创建 sokcet 连接 if(listenFd_ < 0) { LOG_ERROR("Create socket error!", port_); return false; } ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger)); // 设置优雅关闭连接选项 if(ret < 0) { close(listenFd_); LOG_ERROR("Init linger error!", port_); return false; } int optval = 1; /* 端口复用 */ /* 只有最后一个套接字会正常接收数据。 */ ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int)); // 设置端口复用选项 if(ret == -1) { LOG_ERROR("set socket setsockopt error !"); close(listenFd_); return false; } ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr)); if(ret < 0) { LOG_ERROR("Bind Port:%d error!", port_); close(listenFd_); return false; } ret = listen(listenFd_, 6); if(ret < 0) { LOG_ERROR("Listen port:%d error!", port_); close(listenFd_); return false; } ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN); // 监听连接 if(ret == 0) { LOG_ERROR("Add listen error!"); close(listenFd_); return false; } SetFdNonblock(listenFd_); // 设置文件描述符 非阻塞 LOG_INFO("Server port:%d", port_); return true; } int WebServer::SetFdNonblock(int fd) { assert(fd > 0); return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK); } |