注意:这篇文章上次更新于966天前,文章内容可能已经过时。
WebServer
缓冲区 Buffer
buffer.h
#ifndef BUFFER_H
#define BUFFER_H
#include <cstring> //perror
#include <iostream>
#include <unistd.h> // write
#include <sys/uio.h> //readv
#include <vector> //readv
#include <atomic> // 原子类型
#include <assert.h>
class Buffer {
public:
Buffer(int initBuffSize = 1024);
~Buffer() = default;
size_t WritableBytes() const; // 可写字节数 :总长度-写位置
size_t ReadableBytes() const ; // 可读字节数 :写位置-读位置
size_t PrependableBytes() const; // 预置字节数 :读位置
const char* Peek() const; // 第一个数据字节
void EnsureWriteable(size_t len); // 确认可写长度为 len
void HasWritten(size_t len); // 已经写入了 len 个字节 调用 HasWritten 函数更新写位置
void Retrieve(size_t len); // 将读位置前移 len
void RetrieveUntil(const char* end);// 将读位置前移至 const char* end
void RetrieveAll() ; // 缓冲区重置
std::string RetrieveAllToStr(); // 返回现有缓冲区内容,并将缓冲区重置
const char* BeginWriteConst() const;// 返回第一个可写位置
char* BeginWrite(); // 返回第一个可写位置
void Append(const std::string& str);
void Append(const char* str, size_t len); // 核心函数 另外三个都是调用的这个 将 str 起始位置长度为 len 的字符串写入到缓冲区
void Append(const void* data, size_t len);
void Append(const Buffer& buff);
ssize_t ReadFd(int fd, int* Errno); // 从文件描述符读取数据到缓冲区
ssize_t WriteFd(int fd, int* Errno); // 将缓冲区数据写入到文件描述符
private:
char* BeginPtr_(); // 缓冲区首地址
const char* BeginPtr_() const; // 缓冲区首地址
void MakeSpace_(size_t len); // 获得空间 len
std::vector<char> buffer_; // 缓冲区 大小为 initBuffSize
std::atomic<std::size_t> readPos_; // 缓冲区读地址(下标)(偏移量) 设置为原子类型,保证线程安全
std::atomic<std::size_t> writePos_; // 缓冲区写地址(下标)(偏移量)
};
#endif //BUFFER_H
buffer.cpp
#include "buffer.h"
Buffer::Buffer(int initBuffSize) : buffer_(initBuffSize), readPos_(0), writePos_(0) {}
size_t Buffer::ReadableBytes() const {
return writePos_ - readPos_;
}
size_t Buffer::WritableBytes() const {
return buffer_.size() - writePos_;
}
size_t Buffer::PrependableBytes() const {
return readPos_;
}
const char* Buffer::Peek() const {
return BeginPtr_() + readPos_;
}
void Buffer::Retrieve(size_t len) {
assert(len <= ReadableBytes());
readPos_ += len;
}
void Buffer::RetrieveUntil(const char* end) {
assert(Peek() <= end );
Retrieve(end - Peek());
}
void Buffer::RetrieveAll() {
bzero(&buffer_[0], buffer_.size());
readPos_ = 0;
writePos_ = 0;
}
std::string Buffer::RetrieveAllToStr() {
std::string str(Peek(), ReadableBytes()); // string s(cstr,char_len) : 将缓冲区的内容构造出字符串 str 返回
RetrieveAll(); // 再将缓冲区重置
return str;
}
const char* Buffer::BeginWriteConst() const {
return BeginPtr_() + writePos_;
}
char* Buffer::BeginWrite() {
return BeginPtr_() + writePos_;
}
void Buffer::HasWritten(size_t len) {
writePos_ += len;
}
void Buffer::Append(const std::string& str) {
Append(str.data(), str.length()); // data 方法返回首地址 length 方法返回长度
}
void Buffer::Append(const void* data, size_t len) {
assert(data);
Append(static_cast<const char*>(data), len); // void* 类型要静态转换为 const char*
}
void Buffer::Append(const char* str, size_t len) {
assert(str);
EnsureWriteable(len); // 确认可写长度 len
std::copy(str, str + len, BeginWrite()); // 调用copy 进行写入
HasWritten(len); // 调用 HasWritten 更新写位置
}
void Buffer::Append(const Buffer& buff) {
Append(buff.Peek(), buff.ReadableBytes()); // 将另一个缓冲区的内容写入到当前缓冲区
}
void Buffer::EnsureWriteable(size_t len) {
if(WritableBytes() < len) { // 如果可写长度(写位置到缓冲区结束)小于 len
MakeSpace_(len); // 获得空间 len
}
assert(WritableBytes() >= len);
}
ssize_t Buffer::ReadFd(int fd, int* saveErrno) {
char buff[65535];
struct iovec iov[2];
const size_t writable = WritableBytes();
/* 分散读, 保证数据全部读完 */
iov[0].iov_base = BeginPtr_() + writePos_;
iov[0].iov_len = writable;
iov[1].iov_base = buff;
iov[1].iov_len = sizeof(buff);
const ssize_t len = readv(fd, iov, 2);
if(len < 0) { // 读错误
*saveErrno = errno;
}
else if(static_cast<size_t>(len) <= writable) { // 读取字节数小于缓冲区剩余可写字节数
writePos_ += len;
}
else { // buff 中有数据
writePos_ = buffer_.size(); // 已经写满,写偏移到达结尾位置
Append(buff, len - writable); // 将 buff 中的数据写入缓冲区
}
return len; // 返回读取字节长度
}
ssize_t Buffer::WriteFd(int fd, int* saveErrno) {
size_t readSize = ReadableBytes(); // 缓冲区中的字节数
ssize_t len = write(fd, Peek(), readSize); // 将缓冲区中的数据写入文件描述符
if(len < 0) {
*saveErrno = errno;
return len;
}
readPos_ += len; // 更新可读位置
return len;
}
char* Buffer::BeginPtr_() { // 缓冲区首地址
return &*buffer_.begin();
}
const char* Buffer::BeginPtr_() const { // 缓冲区首地址
return &*buffer_.begin();
}
void Buffer::MakeSpace_(size_t len) {
if(WritableBytes() + PrependableBytes() < len) { // 缓冲区剩余长度小于 len的话
buffer_.resize(writePos_ + len + 1); // 扩容缓冲区
}
else { // 否则调整缓冲区数据位置 使空位置连续
size_t readable = ReadableBytes();
std::copy(BeginPtr_() + readPos_, BeginPtr_() + writePos_, BeginPtr_()); // 把一个序列拷贝到一个容器中去 就是把缓冲区原有数据移动到vector的起始地址处
readPos_ = 0; // 更新读位置
writePos_ = readPos_ + readable; // 更新写位置
assert(readable == ReadableBytes());
}
}
日志系统
log.h
#ifndef LOG_H
#define LOG_H
#include <mutex>
#include <string>
#include <thread>
#include <sys/time.h>
#include <string.h>
#include <stdarg.h> // vastart va_end
#include <assert.h>
#include <sys/stat.h> //mkdir
#include "blockqueue.h"
#include "../buffer/buffer.h"
class Log {
public:
void init(int level, const char* path = "./log",
const char* suffix =".log",
int maxQueueCapacity = 1024);
static Log* Instance();
static void FlushLogThread();
void write(int level, const char *format,...);
void flush();
int GetLevel();
void SetLevel(int level);
bool IsOpen() { return isOpen_; }
private:
Log(); // 私有化构造函数
void AppendLogLevelTitle_(int level);
virtual ~Log(); // 虚析构
void AsyncWrite_(); // 异步写
private:
static const int LOG_PATH_LEN = 256; // 日志路径长度
static const int LOG_NAME_LEN = 256; // 日志名称长度
static const int MAX_LINES = 50000; // 最大行数
const char* path_; // 路径
const char* suffix_; // 后缀
int MAX_LINES_; // 最大行
int lineCount_; // 行数
int toDay_; // 今天
bool isOpen_; // 打开标志
Buffer buff_; // 缓冲区
int level_; // 日志级别
bool isAsync_; // 异步标志
FILE* fp_; // 文件指针
std::unique_ptr<BlockDeque<std::string>> deque_; // 阻塞队列 指针
std::unique_ptr<std::thread> writeThread_; // 写线程 指针
std::mutex mtx_; // 互斥锁
};
#define LOG_BASE(level, format, ...) \
do {\
Log* log = Log::Instance();\
if (log->IsOpen() && log->GetLevel() <= level) {\
log->write(level, format, ##__VA_ARGS__); \
log->flush();\
}\
} while(0);
#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0); // ##__VA_ARGS__ 可变参数宏
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);
#endif //LOG_H
log.cpp
#include "log.h"
using namespace std;
Log::Log() {
lineCount_ = 0;
isAsync_ = false;
writeThread_ = nullptr;
deque_ = nullptr;
toDay_ = 0;
fp_ = nullptr;
}
Log::~Log() {
if(writeThread_ && writeThread_->joinable()) {
while(!deque_->empty()) {
deque_->flush();
};
deque_->Close();
writeThread_->join();
}
if(fp_) {
lock_guard<mutex> locker(mtx_);
flush();
fclose(fp_);
}
}
int Log::GetLevel() {
lock_guard<mutex> locker(mtx_);
return level_;
}
void Log::SetLevel(int level) {
lock_guard<mutex> locker(mtx_);
level_ = level;
}
void Log::init(int level = 1, const char* path, const char* suffix,
int maxQueueSize) {
isOpen_ = true;
level_ = level;
if(maxQueueSize > 0) {
isAsync_ = true;
if(!deque_) {
unique_ptr<BlockDeque<std::string>> newDeque(new BlockDeque<std::string>);
deque_ = move(newDeque);
std::unique_ptr<std::thread> NewThread(new thread(FlushLogThread));
writeThread_ = move(NewThread);
}
} else {
isAsync_ = false;
}
lineCount_ = 0;
time_t timer = time(nullptr);
struct tm *sysTime = localtime(&timer);
struct tm t = *sysTime;
path_ = path;
suffix_ = suffix;
char fileName[LOG_NAME_LEN] = {0};
snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s",
path_, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_);
toDay_ = t.tm_mday;
{
lock_guard<mutex> locker(mtx_);
buff_.RetrieveAll();
if(fp_) {
flush();
fclose(fp_);
}
fp_ = fopen(fileName, "a");
if(fp_ == nullptr) {
mkdir(path_, 0777);
fp_ = fopen(fileName, "a");
}
assert(fp_ != nullptr);
}
}
void Log::write(int level, const char *format, ...) {
struct timeval now = {0, 0}; // 秒 和 微妙
gettimeofday(&now, nullptr);
time_t tSec = now.tv_sec;
struct tm *sysTime = localtime(&tSec);
struct tm t = *sysTime;
va_list vaList;
/* 日志日期 日志行数 */
if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0)))
{
unique_lock<mutex> locker(mtx_);
locker.unlock();
char newFile[LOG_NAME_LEN];
char tail[36] = {0};
snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);
if (toDay_ != t.tm_mday)
{
snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);
toDay_ = t.tm_mday;
lineCount_ = 0;
}
else {
snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_ / MAX_LINES), suffix_);
}
locker.lock();
flush();
fclose(fp_);
fp_ = fopen(newFile, "a");
assert(fp_ != nullptr);
}
{
unique_lock<mutex> locker(mtx_);
lineCount_++;
int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,
t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);
buff_.HasWritten(n);
AppendLogLevelTitle_(level);
va_start(vaList, format);
int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);
va_end(vaList);
buff_.HasWritten(m);
buff_.Append("\n\0", 2);
if(isAsync_ && deque_ && !deque_->full()) {
deque_->push_back(buff_.RetrieveAllToStr());
} else {
fputs(buff_.Peek(), fp_);
}
buff_.RetrieveAll();
}
}
void Log::AppendLogLevelTitle_(int level) {
switch(level) {
case 0:
buff_.Append("[debug]: ", 9);
break;
case 1:
buff_.Append("[info] : ", 9);
break;
case 2:
buff_.Append("[warn] : ", 9);
break;
case 3:
buff_.Append("[error]: ", 9);
break;
default:
buff_.Append("[info] : ", 9);
break;
}
}
void Log::flush() {
if(isAsync_) {
deque_->flush();
}
fflush(fp_);
}
void Log::AsyncWrite_() {
string str = "";
while(deque_->pop(str)) {
lock_guard<mutex> locker(mtx_);
fputs(str.c_str(), fp_);
}
}
Log* Log::Instance() {
static Log inst;
return &inst;
}
void Log::FlushLogThread() {
Log::Instance()->AsyncWrite_();
}
blockqueue.h
#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H
#include <mutex> // 互斥锁
#include <deque>
#include <condition_variable>
#include <sys/time.h>
// 类模板
// 声明和实现写在一个文件里免得出错
template<class T>
class BlockDeque {
public:
explicit BlockDeque(size_t MaxCapacity = 1000); // 显示构造函数
~BlockDeque();
void clear();
bool empty();
bool full();
void Close();
size_t size();
size_t capacity();
T front();
T back();
void push_back(const T &item);
void push_front(const T &item);
bool pop(T &item);
bool pop(T &item, int timeout);
void flush();
private:
std::deque<T> deq_; // T 类型的队列
size_t capacity_; // 容量
std::mutex mtx_; // 互斥锁
bool isClose_; // 是否关闭
std::condition_variable condConsumer_; // 条件变量 消费者
std::condition_variable condProducer_; // 条件变量 生产者
};
template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) :capacity_(MaxCapacity) {
assert(MaxCapacity > 0);
isClose_ = false;
}
template<class T>
BlockDeque<T>::~BlockDeque() {
Close();
};
template<class T>
void BlockDeque<T>::Close() {
{
std::lock_guard<std::mutex> locker(mtx_); // 程序在 std::lock_guard 的对象 locker 的生命周期中加锁 符合 RAII 机制
deq_.clear(); // 清空队列
isClose_ = true; // 关闭标志为 true
}
condProducer_.notify_all(); // 唤醒所有线程
condConsumer_.notify_all();
};
template<class T>
void BlockDeque<T>::flush() {
condConsumer_.notify_one(); // 唤醒一个消费者线程
};
template<class T>
void BlockDeque<T>::clear() { // 清空队列
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
}
template<class T>
T BlockDeque<T>::front() { // 返回队头元素
std::lock_guard<std::mutex> locker(mtx_);
return deq_.front();
}
template<class T>
T BlockDeque<T>::back() { // 返回队尾元素
std::lock_guard<std::mutex> locker(mtx_);
return deq_.back();
}
template<class T>
size_t BlockDeque<T>::size() { // 返回队列大小
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size();
}
template<class T>
size_t BlockDeque<T>::capacity() { // 返回队列容量
std::lock_guard<std::mutex> locker(mtx_);
return capacity_;
}
template<class T>
void BlockDeque<T>::push_back(const T &item) { // 尾插
std::unique_lock<std::mutex> locker(mtx_); // 这里是 unique_lock
while(deq_.size() >= capacity_) {
condProducer_.wait(locker); // push 的时候 容量不足时,生产者线程在这等,等待pop函数中将其唤醒
}
deq_.push_back(item); // 容量够了 push 进去
condConsumer_.notify_one(); // 唤醒消费者线程
}
template<class T>
void BlockDeque<T>::push_front(const T &item) { // 与上面同理
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);
}
deq_.push_front(item);
condConsumer_.notify_one();
}
template<class T>
bool BlockDeque<T>::empty() { // 判空
std::lock_guard<std::mutex> locker(mtx_);
return deq_.empty();
}
template<class T>
bool BlockDeque<T>::full(){ // 判满
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size() >= capacity_;
}
template<class T>
bool BlockDeque<T>::pop(T &item) { // 弹出队头元素 传出参数 item
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
condConsumer_.wait(locker);
if(isClose_){
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}
template<class T>
bool BlockDeque<T>::pop(T &item, int timeout) {
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty()){
if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout)) // 带等待时间的弹出
== std::cv_status::timeout){
return false;
}
if(isClose_){
return false;
}
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}
#endif // BLOCKQUEUE_H
池
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {
public:
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
assert(threadCount > 0);
for(size_t i = 0; i < threadCount; i++) { // threadCount 个线程处理函数队列里的函数
std::thread([pool = pool_] {
std::unique_lock<std::mutex> locker(pool->mtx);
while(true) {
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front()); // 操作任务队列时 要上锁
pool->tasks.pop();
locker.unlock();
task(); // 执行任务时解锁 其他线程可以操作队列
locker.lock();
}
else if(pool->isClosed) break;
else pool->cond.wait(locker); // 没有任务了 等待
}
}).detach(); // 使用 join 时 主线程阻塞 使用 detach(分离)主线程继续执行
}
}
ThreadPool() = default;
ThreadPool(ThreadPool&&) = default;
~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one(); // 加入任务了 唤醒一个线程
}
private:
struct Pool { // Pool 结构体
std::mutex mtx; // 互斥锁
std::condition_variable cond; // 条件变量
bool isClosed; // 关闭标志
std::queue<std::function<void()>> tasks; // 函数队列
};
std::shared_ptr<Pool> pool_; // 指向 Pool 的只能指针
};
#endif //THREADPOOL_H
sqlconnpool.h
#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H
#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h> // 信号量
#include <thread>
#include "../log/log.h"
class SqlConnPool {
public:
static SqlConnPool *Instance(); // 单例模式
MYSQL *GetConn(); // 获取 mysql 连接
void FreeConn(MYSQL * conn); // 释放 mysql 连接
int GetFreeConnCount(); // 获得空闲连接数量
void Init(const char* host, int port, // 初始化
const char* user,const char* pwd,
const char* dbName, int connSize);
void ClosePool(); // 关闭连接池
private:
SqlConnPool();
~SqlConnPool();
int MAX_CONN_; // 最大连接数量
int useCount_; // 用户数量
int freeCount_; // 空闲连接数量
std::queue<MYSQL *> connQue_; // mysql连接队列
std::mutex mtx_; // 互斥锁
sem_t semId_; // 信号量
};
#endif // SQLCONNPOOL_H
sqlconnpool.cpp
#include "sqlconnpool.h"
using namespace std;
SqlConnPool::SqlConnPool() { // 构造函数
useCount_ = 0;
freeCount_ = 0;
}
SqlConnPool* SqlConnPool::Instance() {
static SqlConnPool connPool; // 静态对象 全局就这一个
return &connPool;
}
void SqlConnPool::Init(const char* host, int port,
const char* user,const char* pwd, const char* dbName,
int connSize = 10) {
assert(connSize > 0);
for (int i = 0; i < connSize; i++) {
MYSQL *sql = nullptr; // 生命 MYSQL 指针
sql = mysql_init(sql); // 初始化 sql 连接
if (!sql) {
LOG_ERROR("MySql init error!");
assert(sql);
}
sql = mysql_real_connect(sql, host, // 连接 mysql
user, pwd,
dbName, port, nullptr, 0);
if (!sql) {
LOG_ERROR("MySql Connect error!");
}
connQue_.push(sql); // 加入到连接队列
}
MAX_CONN_ = connSize;
sem_init(&semId_, 0, MAX_CONN_); // 初始化信号量 第二个参数为 0 时 当前进程的所有线程共享 否则在进程间共享 MAX_CONN_为信号量初始值 可用资源数目
}
MYSQL* SqlConnPool::GetConn() {
MYSQL *sql = nullptr;
if(connQue_.empty()){
LOG_WARN("SqlConnPool busy!");
return nullptr;
}
sem_wait(&semId_); // sem_wait 当信号量大于0时 使其减1 否则阻塞
{
lock_guard<mutex> locker(mtx_); // 操作队列前要上锁
sql = connQue_.front(); // 从连接队列中取出连接
connQue_.pop();
}
return sql;
}
void SqlConnPool::FreeConn(MYSQL* sql) { // 释放连接
assert(sql);
lock_guard<mutex> locker(mtx_);
connQue_.push(sql); // 连接用完了 放回队列里
sem_post(&semId_); // 信号量加一 也就是可用资源加一
}
void SqlConnPool::ClosePool() { // 关闭连接池
lock_guard<mutex> locker(mtx_);
while(!connQue_.empty()) {
auto item = connQue_.front();
connQue_.pop();
mysql_close(item); // 逐一关闭连接
}
mysql_library_end(); // mysql使用结束后要调用这个函数 进行一些内存清理
}
int SqlConnPool::GetFreeConnCount() { // 可用连接数量 就是队列的长度
lock_guard<mutex> locker(mtx_);
return connQue_.size();
}
SqlConnPool::~SqlConnPool() {
ClosePool();
}
sqlconnRAII.h
#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include "sqlconnpool.h"
/* 资源在对象构造初始化 资源在对象析构时释放*/
class SqlConnRAII {
public:
SqlConnRAII(MYSQL** sql, SqlConnPool *connpool) { // sql 是传出参数
assert(connpool);
*sql = connpool->GetConn(); // SqlConnRAII 的构造函数中 获得连接
sql_ = *sql;
connpool_ = connpool;
}
~SqlConnRAII() {
if(sql_) { connpool_->FreeConn(sql_); } // SqlConnRAII 的析构函数中 释放连接
}
private:
MYSQL *sql_; // mysql 指针
SqlConnPool* connpool_; // 连接池 指针
};
#endif //SQLCONNRAII_H
定时器
heaptimer.h
#ifndef HEAP_TIMER_H
#define HEAP_TIMER_H
#include <queue>
#include <unordered_map>
#include <time.h>
#include <algorithm>
#include <arpa/inet.h>
#include <functional>
#include <assert.h>
#include <chrono> // 时间工具
#include "../log/log.h"
typedef std::function<void()> TimeoutCallBack;
typedef std::chrono::high_resolution_clock Clock; // 时钟
typedef std::chrono::milliseconds MS; // 毫秒
typedef Clock::time_point TimeStamp; // 时间点
struct TimerNode { // 定时器节点
int id;
TimeStamp expires; // 过期时间
TimeoutCallBack cb;
bool operator<(const TimerNode& t) {
return expires < t.expires;
}
};
class HeapTimer { // 时间堆类
public:
HeapTimer() { heap_.reserve(64); }
~HeapTimer() { clear(); }
void adjust(int id, int newExpires);
void add(int id, int timeOut, const TimeoutCallBack& cb);
void doWork(int id);
void clear();
void tick();
void pop();
int GetNextTick();
private:
void del_(size_t i); // 删除节点
void siftup_(size_t i); // 向上调整
bool siftdown_(size_t index, size_t n); // 向下调整
void SwapNode_(size_t i, size_t j); // 交换节点
std::vector<TimerNode> heap_; // 堆
std::unordered_map<int, size_t> ref_; // id-下标 映射表
};
#endif //HEAP_TIMER_H
heaptimer.cpp
#include "heaptimer.h"
void HeapTimer::siftup_(size_t i) { // 向上调整
assert(i >= 0 && i < heap_.size());
size_t j = (i - 1) / 2; // j 是 i 的父节点
while(j >= 0) {
if(heap_[j] < heap_[i]) { break; } // 父节点比子节点小 不需要调整
SwapNode_(i, j); // 否则交换
i = j;
j = (i - 1) / 2; // 继续向上调整
}
}
void HeapTimer::SwapNode_(size_t i, size_t j) {
assert(i >= 0 && i < heap_.size());
assert(j >= 0 && j < heap_.size());
std::swap(heap_[i], heap_[j]); // 交换节点
ref_[heap_[i].id] = i; // 更新 映射关系
ref_[heap_[j].id] = j;
}
bool HeapTimer::siftdown_(size_t index, size_t n) { // 向下调整
assert(index >= 0 && index < heap_.size());
assert(n >= 0 && n <= heap_.size());
size_t i = index;
size_t j = i * 2 + 1; // 左子节点
while(j < n) {
if(j + 1 < n && heap_[j + 1] < heap_[j]) j++; // j + 1 是右子节点 如果j+1更小 那么j = j+1
if(heap_[i] < heap_[j]) break; // i 比 j 小 不需要调整
SwapNode_(i, j); // 否则交换 i j
i = j; // 继续循环向下调整
j = i * 2 + 1;
}
return i > index; // 返回是否调整过
}
void HeapTimer::add(int id, int timeout, const TimeoutCallBack& cb) {
assert(id >= 0);
size_t i;
if(ref_.count(id) == 0) {
/* 新节点:堆尾插入,调整堆 */
i = heap_.size();
ref_[id] = i;
heap_.push_back({id, Clock::now() + MS(timeout), cb});
siftup_(i);
}
else {
/* 已有结点:调整堆 */
i = ref_[id];
heap_[i].expires = Clock::now() + MS(timeout);
heap_[i].cb = cb;
if(!siftdown_(i, heap_.size())) {
siftup_(i);
}
}
}
void HeapTimer::doWork(int id) {
/* 删除指定id结点,并触发回调函数 */
if(heap_.empty() || ref_.count(id) == 0) {
return;
}
size_t i = ref_[id];
TimerNode node = heap_[i];
node.cb();
del_(i);
}
void HeapTimer::del_(size_t index) {
/* 删除指定位置的结点 */
assert(!heap_.empty() && index >= 0 && index < heap_.size());
/* 将要删除的结点换到队尾,然后调整堆 */
size_t i = index;
size_t n = heap_.size() - 1;
assert(i <= n);
if(i < n) {
SwapNode_(i, n);
if(!siftdown_(i, n)) {
siftup_(i);
}
}
/* 队尾元素删除 */
ref_.erase(heap_.back().id);
heap_.pop_back();
}
void HeapTimer::adjust(int id, int timeout) {
/* 调整指定id的结点 */
assert(!heap_.empty() && ref_.count(id) > 0);
heap_[ref_[id]].expires = Clock::now() + MS(timeout);;
siftdown_(ref_[id], heap_.size());
}
void HeapTimer::tick() {
/* 清除超时结点 */
if(heap_.empty()) {
return;
}
while(!heap_.empty()) {
TimerNode node = heap_.front();
if(std::chrono::duration_cast<MS>(node.expires - Clock::now()).count() > 0) {
break;
}
node.cb();
pop();
}
}
void HeapTimer::pop() {
assert(!heap_.empty());
del_(0);
}
void HeapTimer::clear() {
ref_.clear();
heap_.clear();
}
int HeapTimer::GetNextTick() {
tick();
size_t res = -1;
if(!heap_.empty()) {
res = std::chrono::duration_cast<MS>(heap_.front().expires - Clock::now()).count();
if(res < 0) { res = 0; }
}
return res;
}
HTTP 连接
httprequest.h
#ifndef HTTP_REQUEST_H
#define HTTP_REQUEST_H
#include <unordered_map>
#include <unordered_set>
#include <string>
#include <regex> // 正则匹配
#include <errno.h>
#include <mysql/mysql.h> //mysql
#include "../buffer/buffer.h"
#include "../log/log.h"
#include "../pool/sqlconnpool.h"
#include "../pool/sqlconnRAII.h"
class HttpRequest {
public:
enum PARSE_STATE { // 定义行解析状态
REQUEST_LINE,
HEADERS,
BODY,
FINISH,
};
enum HTTP_CODE { // 定义 HTTP 状态码
NO_REQUEST = 0,
GET_REQUEST,
BAD_REQUEST,
NO_RESOURSE,
FORBIDDENT_REQUEST,
FILE_REQUEST,
INTERNAL_ERROR,
CLOSED_CONNECTION,
};
HttpRequest() { Init(); }
~HttpRequest() = default;
void Init();
bool parse(Buffer& buff);
std::string path() const;
std::string& path();
std::string method() const;
std::string version() const;
std::string GetPost(const std::string& key) const;
std::string GetPost(const char* key) const;
bool IsKeepAlive() const;
/*
todo
void HttpConn::ParseFormData() {}
void HttpConn::ParseJson() {}
*/
private:
bool ParseRequestLine_(const std::string& line);
void ParseHeader_(const std::string& line);
void ParseBody_(const std::string& line);
void ParsePath_();
void ParsePost_();
void ParseFromUrlencoded_(int tag);
static bool UserVerify(const std::string& name, const std::string& pwd, bool isLogin);
PARSE_STATE state_;
std::string method_, path_, version_, body_; // 请求方法 请求路径 请求版本 请求体
std::unordered_map<std::string, std::string> header_; // 请求头
std::unordered_map<std::string, std::string> post_;
static const std::unordered_set<std::string> DEFAULT_HTML;
static const std::unordered_map<std::string, int> DEFAULT_HTML_TAG;
static int ConverHex(char ch);
};
#endif //HTTP_REQUEST_H
httprequest.cpp
#include "httprequest.h"
using namespace std;
const unordered_set<string> HttpRequest::DEFAULT_HTML{
"/index", "/register", "/login",
"/welcome", "/video", "/picture","/sendmail" }; // 默认 html
const unordered_map<string, int> HttpRequest::DEFAULT_HTML_TAG { // html 标记
{"/register.html", 0}, {"/login.html", 1}, {"/sendmail.html", 2} };
void HttpRequest::Init() {
method_ = path_ = version_ = body_ = "";
state_ = REQUEST_LINE;
header_.clear();
post_.clear();
}
bool HttpRequest::IsKeepAlive() const {
if(header_.count("Connection") == 1) {
return header_.find("Connection")->second == "keep-alive" && version_ == "1.1"; // 是否 keep-alive
}
return false;
}
bool HttpRequest::parse(Buffer& buff) { // 缓冲区解析
const char CRLF[] = "\r\n"; // 预定义 \r\n 数组
if(buff.ReadableBytes() <= 0) { // 缓冲区没有内容 返回失败
return false;
}
while(buff.ReadableBytes() && state_ != FINISH) { // 缓冲区有内容 并且 http 状态不是 FINISH
const char* lineEnd = search(buff.Peek(), buff.BeginWriteConst(), CRLF, CRLF + 2); // search 参数代表两个左闭右开区间
std::string line(buff.Peek(), lineEnd); // 构造字符串 获得一行
switch(state_) // 有限状态机编程
{
case REQUEST_LINE:
if(!ParseRequestLine_(line)) { // 解析请求行
return false;
}
ParsePath_();
break;
case HEADERS:
ParseHeader_(line); // 解析头部
if(buff.ReadableBytes() <= 2) {
state_ = FINISH;
}
break;
case BODY:
ParseBody_(line); // 解析 body
break;
default:
break;
}
if(lineEnd == buff.BeginWrite()) { break; } // 缓冲区中无数据
buff.RetrieveUntil(lineEnd + 2); // 更新缓冲区的读位置
}
LOG_DEBUG("[%s], [%s], [%s]", method_.c_str(), path_.c_str(), version_.c_str());
return true; // 解析完成
}
void HttpRequest::ParsePath_() { // 解析请求路径
if(path_ == "/") {
path_ = "/index.html";
}
else {
for(auto &item: DEFAULT_HTML) {
if(item == path_) {
path_ += ".html"; // 为请求路径加上html
break;
}
}
}
}
bool HttpRequest::ParseRequestLine_(const string& line) { // 解析请求行
regex patten("^([^ ]*) ([^ ]*) HTTP/([^ ]*)$"); // 正则匹配 用两个空格分隔 分别获得请求方法 请求路径 和 HTTP 版本
smatch subMatch; // 匹配结果
if(regex_match(line, subMatch, patten)) {
method_ = subMatch[1];
path_ = subMatch[2];
version_ = subMatch[3];
state_ = HEADERS; // 更新状态为 获取头部
return true;
}
LOG_ERROR("RequestLine Error"); // 匹配失败 行错误
return false;
}
void HttpRequest::ParseHeader_(const string& line) { // 解析头部
regex patten("^([^:]*): ?(.*)$"); // 用 冒号空格分割的字符串
smatch subMatch;
if(regex_match(line, subMatch, patten)) {
header_[subMatch[1]] = subMatch[2];
}
else { // 如果匹配失败 代表头部解析完成
state_ = BODY; // 更新状态
}
}
void HttpRequest::ParseBody_(const string& line) { // 解析请求体
body_ = line; // 请求体有内容是 post 请求
ParsePost_(); // 解析 post
state_ = FINISH; // 解析请求结束
LOG_DEBUG("Body:%s, len:%d", line.c_str(), line.size());
}
int HttpRequest::ConverHex(char ch) {
if(ch >= 'A' && ch <= 'F') return ch -'A' + 10;
if(ch >= 'a' && ch <= 'f') return ch -'a' + 10;
return ch;
}
void HttpRequest::ParsePost_() {
if(method_ == "POST" && header_["Content-Type"] == "application/x-www-form-urlencoded") { // 如果内容类型是 url编码
if(DEFAULT_HTML_TAG.count(path_)) {
int tag = DEFAULT_HTML_TAG.find(path_)->second;
LOG_DEBUG("Tag:%d", tag);
if(tag == 0 || tag == 1) {
ParseFromUrlencoded_(tag); // 从 url 中解析数据
bool isLogin = (tag == 1);
if(UserVerify(post_["username"], post_["password"], isLogin)) { // 若果是登录 要进行用户验证
path_ = "/welcome.html";
}
else {
path_ = "/error.html";
}
}
else if(tag == 2){
ParseFromUrlencoded_(tag);
string cmd = "python3 /root/MyWebServer/mail.py ";
cmd += post_["mailto"];
cmd += " ";
cmd += post_["subject"];
cmd += " ";
cmd += post_["content"];
// cout << cmd << endl;
if(system(cmd.c_str()) == 0){
path_ = "/report.html";
}
else{
path_ = "/error.html";
}
}
}
}
}
void HttpRequest::ParseFromUrlencoded_(int tag) {
if(tag == 0 || tag == 1){
if(body_.size() == 0) { return; }
string key, value;
int num = 0;
int n = body_.size();
int i = 0, j = 0;
for(; i < n; i++) {
char ch = body_[i];
switch (ch) {
case '=':
key = body_.substr(j, i - j); // 等号前的是 key
j = i + 1;
break;
case '+':
body_[i] = ' '; // url 编码中 空格被编码成加号
break;
case '%':
num = ConverHex(body_[i + 1]) * 16 + ConverHex(body_[i + 2]); // url 解码 从16进制转换成10进制
body_[i + 2] = num % 10 + '0';
body_[i + 1] = num / 10 + '0';
i += 2;
break;
case '&':
value = body_.substr(j, i - j); // 等号和 & 之间的是 value
j = i + 1; // j 是起始位置 更新
post_[key] = value;
LOG_DEBUG("%s = %s", key.c_str(), value.c_str());
break;
default:
break;
}
}
assert(j <= i);
if(post_.count(key) == 0 && j < i) { // 插入最后一个 value
value = body_.substr(j, i - j);
post_[key] = value;
}
}
else if(tag == 2){
if(body_.size() == 0){
return;
}
string key,value;
int n = body_.size();
size_t i = 0,j = 0;
while(body_.find('&',j) != -1){
i = body_.find('&',j);
size_t k = body_.find('=',j);
key = body_.substr(j,k-j);
value = body_.substr(k+1,i-k-1);
j = i + 1;
post_[key] = value;
}
assert(j < n);
i = body_.find('=',j);
key = body_.substr(j,i-j);
value = body_.substr(i+1,n-i-1);
post_[key] = value;
}
}
bool HttpRequest::UserVerify(const string &name, const string &pwd, bool isLogin) {
if(name == "" || pwd == "") { return false; }
LOG_INFO("Verify name:%s pwd:%s", name.c_str(), pwd.c_str());
MYSQL* sql;
SqlConnRAII(&sql, SqlConnPool::Instance()); // 获得 sql 连接
assert(sql);
bool flag = false;
unsigned int j = 0;
char order[256] = { 0 }; // 查询语句
MYSQL_FIELD *fields = nullptr; // 字段列数组
MYSQL_RES *res = nullptr; // 返回行的一个查询结果集
if(!isLogin) { flag = true; }
/* 查询用户及密码 */
snprintf(order, 256, "SELECT username, password FROM user WHERE username='%s' LIMIT 1", name.c_str());
LOG_DEBUG("%s", order);
if(mysql_query(sql, order)) { // 查询成功返回 0
mysql_free_result(res);
return false;
}
res = mysql_store_result(sql); // 存储结果集
j = mysql_num_fields(res); // 字段数量
fields = mysql_fetch_fields(res); // 字段结构数组
while(MYSQL_ROW row = mysql_fetch_row(res)) { // 获得一行
LOG_DEBUG("MYSQL ROW: %s %s", row[0], row[1]);
string password(row[1]); // row[0] 是用户名 row[1] 是密码
/* 登录行为 验证密码 */
if(isLogin) {
if(pwd == password) { flag = true; }
else {
flag = false;
LOG_DEBUG("pwd error!");
}
}
else {
flag = false;
LOG_DEBUG("user used!");
}
}
mysql_free_result(res);
/* 注册行为 且 用户名未被使用*/
if(!isLogin && flag == true) {
LOG_DEBUG("regirster!");
bzero(order, 256);
snprintf(order, 256,"INSERT INTO user(username, password) VALUES('%s','%s')", name.c_str(), pwd.c_str());
LOG_DEBUG( "%s", order);
if(mysql_query(sql, order)) {
LOG_DEBUG( "Insert error!");
flag = false;
}
flag = true;
}
SqlConnPool::Instance()->FreeConn(sql);
LOG_DEBUG( "UserVerify success!!");
return flag;
}
std::string HttpRequest::path() const{
return path_;
}
std::string& HttpRequest::path(){
return path_;
}
std::string HttpRequest::method() const {
return method_;
}
std::string HttpRequest::version() const {
return version_;
}
std::string HttpRequest::GetPost(const std::string& key) const {
assert(key != "");
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}
std::string HttpRequest::GetPost(const char* key) const {
assert(key != nullptr);
if(post_.count(key) == 1) {
return post_.find(key)->second;
}
return "";
}
httpresponse.h
#ifndef HTTP_RESPONSE_H
#define HTTP_RESPONSE_H
#include <unordered_map>
#include <fcntl.h> // open
#include <unistd.h> // close
#include <sys/stat.h> // stat
#include <sys/mman.h> // mmap, munmap 共享内存
#include "../buffer/buffer.h"
#include "../log/log.h"
class HttpResponse {
public:
HttpResponse();
~HttpResponse();
void Init(const std::string& srcDir, std::string& path, bool isKeepAlive = false, int code = -1);
void MakeResponse(Buffer& buff);
void UnmapFile();
char* File();
size_t FileLen() const;
void ErrorContent(Buffer& buff, std::string message);
int Code() const { return code_; }
private:
void AddStateLine_(Buffer &buff);
void AddHeader_(Buffer &buff);
void AddContent_(Buffer &buff);
void ErrorHtml_();
std::string GetFileType_();
int code_;
bool isKeepAlive_;
std::string path_;
std::string srcDir_;
char* mmFile_;
struct stat mmFileStat_;
static const std::unordered_map<std::string, std::string> SUFFIX_TYPE;
static const std::unordered_map<int, std::string> CODE_STATUS;
static const std::unordered_map<int, std::string> CODE_PATH;
};
#endif //HTTP_RESPONSE_H
httpresponse.cpp
#include "httpresponse.h"
using namespace std;
const unordered_map<string, string> HttpResponse::SUFFIX_TYPE = { // 后缀类型
{ ".html", "text/html" },
{ ".xml", "text/xml" },
{ ".xhtml", "application/xhtml+xml" },
{ ".txt", "text/plain" },
{ ".rtf", "application/rtf" },
{ ".pdf", "application/pdf" },
{ ".word", "application/nsword" },
{ ".png", "image/png" },
{ ".gif", "image/gif" },
{ ".jpg", "image/jpeg" },
{ ".jpeg", "image/jpeg" },
{ ".au", "audio/basic" },
{ ".mpeg", "video/mpeg" },
{ ".mpg", "video/mpeg" },
{ ".avi", "video/x-msvideo" },
{ ".gz", "application/x-gzip" },
{ ".tar", "application/x-tar" },
{ ".css", "text/css "},
{ ".js", "text/javascript "},
};
const unordered_map<int, string> HttpResponse::CODE_STATUS = { // 响应状态码
{ 200, "OK" },
{ 400, "Bad Request" },
{ 403, "Forbidden" },
{ 404, "Not Found" },
};
const unordered_map<int, string> HttpResponse::CODE_PATH = { // 状态码 和 文件映射
{ 400, "/400.html" },
{ 403, "/403.html" },
{ 404, "/404.html" },
};
HttpResponse::HttpResponse() { // 构造函数
code_ = -1;
path_ = srcDir_ = "";
isKeepAlive_ = false;
mmFile_ = nullptr;
mmFileStat_ = { 0 };
};
HttpResponse::~HttpResponse() { // 析构函数
UnmapFile();
}
void HttpResponse::Init(const string& srcDir, string& path, bool isKeepAlive, int code){ // 初始化
assert(srcDir != "");
if(mmFile_) { UnmapFile(); }
code_ = code;
isKeepAlive_ = isKeepAlive;
path_ = path;
srcDir_ = srcDir;
mmFile_ = nullptr;
mmFileStat_ = { 0 }; // 文件信息结构体 stat
}
void HttpResponse::MakeResponse(Buffer& buff) {
/* 判断请求的资源文件 */
if(stat((srcDir_ + path_).data(), &mmFileStat_) < 0 || S_ISDIR(mmFileStat_.st_mode)) { // stat 函数查看文件状态 第一个参数是文件名 第二个是传出参数 S_ISDIR 宏 检查文件是否是目录
code_ = 404; // 没有这个文件 或者是 目录 返回 404
}
else if(!(mmFileStat_.st_mode & S_IROTH)) { // S_IROTH 其他组 读权限
code_ = 403; // 拒绝
}
else if(code_ == -1) {
code_ = 200;
}
ErrorHtml_(); // 生成错误页面
AddStateLine_(buff); // 向缓冲区中添加状态行
AddHeader_(buff);
AddContent_(buff);
}
char* HttpResponse::File() {
return mmFile_;
}
size_t HttpResponse::FileLen() const { // 文件大小
return mmFileStat_.st_size;
}
void HttpResponse::ErrorHtml_() {
if(CODE_PATH.count(code_) == 1) {
path_ = CODE_PATH.find(code_)->second;
stat((srcDir_ + path_).data(), &mmFileStat_);
}
}
void HttpResponse::AddStateLine_(Buffer& buff) {
string status;
if(CODE_STATUS.count(code_) == 1) {
status = CODE_STATUS.find(code_)->second;
}
else {
code_ = 400;
status = CODE_STATUS.find(400)->second;
}
buff.Append("HTTP/1.1 " + to_string(code_) + " " + status + "\r\n"); // 向缓冲区中写入状态行
}
void HttpResponse::AddHeader_(Buffer& buff) { // 添加响应头部信息
buff.Append("Connection: ");
if(isKeepAlive_) {
buff.Append("keep-alive\r\n");
buff.Append("keep-alive: max=6, timeout=120\r\n");
} else{
buff.Append("close\r\n");
}
buff.Append("Content-type: " + GetFileType_() + "\r\n");
}
void HttpResponse::AddContent_(Buffer& buff) { // 添加响应内容
int srcFd = open((srcDir_ + path_).data(), O_RDONLY); // 只读形式打开文件
if(srcFd < 0) {
ErrorContent(buff, "File NotFound!");
return;
}
/* 将文件映射到内存提高文件的访问速度
MAP_PRIVATE 建立一个写入时拷贝的私有映射*/
LOG_DEBUG("file path %s", (srcDir_ + path_).data());
int* mmRet = (int*)mmap(0, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0); // 将文件映射到共享内存
if(*mmRet == -1) {
ErrorContent(buff, "File NotFound!");
return;
}
mmFile_ = (char*)mmRet;
close(srcFd);
buff.Append("Content-length: " + to_string(mmFileStat_.st_size) + "\r\n\r\n");
}
void HttpResponse::UnmapFile() {
if(mmFile_) {
munmap(mmFile_, mmFileStat_.st_size); // 关闭共享内存
mmFile_ = nullptr;
}
}
string HttpResponse::GetFileType_() {
/* 判断文件类型 */
string::size_type idx = path_.find_last_of('.');
if(idx == string::npos) {
return "text/plain";
}
string suffix = path_.substr(idx);
if(SUFFIX_TYPE.count(suffix) == 1) {
return SUFFIX_TYPE.find(suffix)->second;
}
return "text/plain";
}
void HttpResponse::ErrorContent(Buffer& buff, string message)
{
string body;
string status;
body += "<html><title>Error</title>";
body += "<body bgcolor=\"ffffff\">";
if(CODE_STATUS.count(code_) == 1) {
status = CODE_STATUS.find(code_)->second;
} else {
status = "Bad Request";
}
body += to_string(code_) + " : " + status + "\n";
body += "<p>" + message + "</p>";
body += "<hr><em>TinyWebServer</em></body></html>";
buff.Append("Content-length: " + to_string(body.size()) + "\r\n\r\n");
buff.Append(body);
}
httpconn.h
#ifndef HTTP_CONN_H
#define HTTP_CONN_H
#include <sys/types.h>
#include <sys/uio.h> // readv/writev
#include <arpa/inet.h> // sockaddr_in
#include <stdlib.h> // atoi()
#include <errno.h>
#include "../log/log.h"
#include "../pool/sqlconnRAII.h"
#include "../buffer/buffer.h"
#include "httprequest.h"
#include "httpresponse.h"
class HttpConn {
public:
HttpConn();
~HttpConn();
void init(int sockFd, const sockaddr_in& addr);
ssize_t read(int* saveErrno);
ssize_t write(int* saveErrno);
void Close();
int GetFd() const;
int GetPort() const;
const char* GetIP() const;
sockaddr_in GetAddr() const;
bool process();
int ToWriteBytes() {
return iov_[0].iov_len + iov_[1].iov_len;
}
bool IsKeepAlive() const {
return request_.IsKeepAlive();
}
static bool isET;
static const char* srcDir; // 资源目录
static std::atomic<int> userCount;
private:
int fd_;
struct sockaddr_in addr_;
bool isClose_;
int iovCnt_;
struct iovec iov_[2];
Buffer readBuff_; // 读缓冲区
Buffer writeBuff_; // 写缓冲区
HttpRequest request_; // 请求
HttpResponse response_; // 响应
};
#endif //HTTP_CONN_H
httpconn.cpp
#include "httpconn.h"
using namespace std;
const char* HttpConn::srcDir;
std::atomic<int> HttpConn::userCount;
bool HttpConn::isET;
HttpConn::HttpConn() {
fd_ = -1;
addr_ = { 0 };
isClose_ = true;
};
HttpConn::~HttpConn() {
Close();
};
void HttpConn::init(int fd, const sockaddr_in& addr) { // 使用socket连接文件描述符 和 socket 地址初始化 http 连接
assert(fd > 0);
userCount++;
addr_ = addr;
fd_ = fd;
writeBuff_.RetrieveAll();
readBuff_.RetrieveAll();
isClose_ = false;
LOG_INFO("Client[%d](%s:%d) in, userCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}
void HttpConn::Close() {
response_.UnmapFile();
if(isClose_ == false){
isClose_ = true;
userCount--;
close(fd_);
LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", fd_, GetIP(), GetPort(), (int)userCount);
}
}
int HttpConn::GetFd() const {
return fd_;
};
struct sockaddr_in HttpConn::GetAddr() const {
return addr_;
}
const char* HttpConn::GetIP() const {
return inet_ntoa(addr_.sin_addr);
}
int HttpConn::GetPort() const {
return addr_.sin_port;
}
ssize_t HttpConn::read(int* saveErrno) {
ssize_t len = -1;
do {
len = readBuff_.ReadFd(fd_, saveErrno); // 从连接中读取数据到读缓冲区
if (len <= 0) {
break;
}
} while (isET);
return len;
}
ssize_t HttpConn::write(int* saveErrno) {
ssize_t len = -1;
do {
len = writev(fd_, iov_, iovCnt_);
if(len <= 0) {
*saveErrno = errno;
break;
}
if(iov_[0].iov_len + iov_[1].iov_len == 0) { break; } /* 传输结束 */
else if(static_cast<size_t>(len) > iov_[0].iov_len) { // 如果第一块传完了 第二块传了一部分
iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len); // 更新第二块的 iov_base
iov_[1].iov_len -= (len - iov_[0].iov_len); // 更新第二块的 iov_len
if(iov_[0].iov_len) { // 如果第一块长度不是 0
writeBuff_.RetrieveAll(); // 重置写缓冲区
iov_[0].iov_len = 0; // 第一块长度置零
}
}
else { // 第一块还没有传完
iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len; // 更新第一块的起始地址
iov_[0].iov_len -= len; // 更新第一块的长度
writeBuff_.Retrieve(len); // 写缓冲区删除 len 字节
}
} while(isET || ToWriteBytes() > 10240);
return len;
}
bool HttpConn::process() {
request_.Init();
if(readBuff_.ReadableBytes() <= 0) {
return false;
}
else if(request_.parse(readBuff_)) {
LOG_DEBUG("%s", request_.path().c_str());
response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);
} else {
response_.Init(srcDir, request_.path(), false, 400);
}
response_.MakeResponse(writeBuff_);
/* 响应头 */
iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());
iov_[0].iov_len = writeBuff_.ReadableBytes();
iovCnt_ = 1;
/* 文件 */
if(response_.FileLen() > 0 && response_.File()) {
iov_[1].iov_base = response_.File();
iov_[1].iov_len = response_.FileLen();
iovCnt_ = 2;
}
LOG_DEBUG("filesize:%d, %d to %d", response_.FileLen() , iovCnt_, ToWriteBytes());
return true;
}
Server
epoller.h
#ifndef EPOLLER_H
#define EPOLLER_H
#include <sys/epoll.h> //epoll_ctl()
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <assert.h> // close()
#include <vector>
#include <errno.h>
class Epoller {
public:
explicit Epoller(int maxEvent = 1024);
~Epoller();
bool AddFd(int fd, uint32_t events);
bool ModFd(int fd, uint32_t events);
bool DelFd(int fd);
int Wait(int timeoutMs = -1);
int GetEventFd(size_t i) const;
uint32_t GetEvents(size_t i) const;
private:
int epollFd_;
std::vector<struct epoll_event> events_;
};
#endif //EPOLLER_H
epoller.cpp
#include "epoller.h"
Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent){
assert(epollFd_ >= 0 && events_.size() > 0);
}
Epoller::~Epoller() {
close(epollFd_);
}
bool Epoller::AddFd(int fd, uint32_t events) {
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
}
bool Epoller::ModFd(int fd, uint32_t events) {
if(fd < 0) return false;
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);
}
bool Epoller::DelFd(int fd) {
if(fd < 0) return false;
epoll_event ev = {0};
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev);
}
int Epoller::Wait(int timeoutMs) {
return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}
int Epoller::GetEventFd(size_t i) const {
assert(i < events_.size() && i >= 0);
return events_[i].data.fd;
}
uint32_t Epoller::GetEvents(size_t i) const {
assert(i < events_.size() && i >= 0);
return events_[i].events;
}
webserver.h
#ifndef WEBSERVER_H
#define WEBSERVER_H
#include <unordered_map>
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <assert.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "epoller.h"
#include "../log/log.h"
#include "../timer/heaptimer.h"
#include "../pool/sqlconnpool.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnRAII.h"
#include "../http/httpconn.h"
class WebServer {
public:
WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize);
~WebServer();
void Start();
private:
bool InitSocket_();
void InitEventMode_(int trigMode);
void AddClient_(int fd, sockaddr_in addr);
void DealListen_();
void DealWrite_(HttpConn* client);
void DealRead_(HttpConn* client);
void SendError_(int fd, const char*info);
void ExtentTime_(HttpConn* client);
void CloseConn_(HttpConn* client);
void OnRead_(HttpConn* client);
void OnWrite_(HttpConn* client);
void OnProcess(HttpConn* client);
static const int MAX_FD = 65536;
static int SetFdNonblock(int fd);
int port_;
bool openLinger_;
int timeoutMS_; /* 毫秒MS */
bool isClose_;
int listenFd_;
char* srcDir_;
uint32_t listenEvent_;
uint32_t connEvent_;
std::unique_ptr<HeapTimer> timer_;
std::unique_ptr<ThreadPool> threadpool_;
std::unique_ptr<Epoller> epoller_;
std::unordered_map<int, HttpConn> users_;
};
#endif //WEBSERVER_H
webserver.cpp
#include "webserver.h"
using namespace std;
WebServer::WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize):
port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false),
timer_(new HeapTimer()), threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller())
{
srcDir_ = getcwd(nullptr, 256); // 获取当前工作目录
assert(srcDir_);
strncat(srcDir_, "/resources/", 16); // 字符串拼接
HttpConn::userCount = 0;
HttpConn::srcDir = srcDir_;
SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum); // 数据库连接初始化
InitEventMode_(trigMode); // 初始化 epoll 模式
if(!InitSocket_()) { isClose_ = true;} // 初始化 socket 建立监听连接
if(openLog) {
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if(isClose_) { LOG_ERROR("========== Server init error!=========="); }
else {
LOG_INFO("========== Server init ==========");
LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false");
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("SqlConnPool num: %d, ThreadPool num: %d", connPoolNum, threadNum);
}
}
}
WebServer::~WebServer() {
close(listenFd_);
isClose_ = true;
free(srcDir_);
SqlConnPool::Instance()->ClosePool();
}
void WebServer::InitEventMode_(int trigMode) {
listenEvent_ = EPOLLRDHUP; // 读事件
connEvent_ = EPOLLONESHOT | EPOLLRDHUP; // EPOLLONESHOT oneshot 只触发一次
switch (trigMode) // 选择模式
{
case 0:
break;
case 1:
connEvent_ |= EPOLLET;
break;
case 2:
listenEvent_ |= EPOLLET;
break;
case 3:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
default:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
}
HttpConn::isET = (connEvent_ & EPOLLET);
}
void WebServer::Start() {
int timeMS = -1; /* epoll wait timeout == -1 无事件将阻塞 */
if(!isClose_) { LOG_INFO("========== Server start =========="); }
while(!isClose_) {
if(timeoutMS_ > 0) {
timeMS = timer_->GetNextTick(); // 函数内部调用了一次 tick 清除超时节点
}
int eventCnt = epoller_->Wait(timeMS);
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
if(fd == listenFd_) {
DealListen_(); // 监听文件描述符的事件 有新的连接 建立新连接
}
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { // 这些错误事件关闭连接
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
else if(events & EPOLLIN) { // 可读事件
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]); // 处理读
}
else if(events & EPOLLOUT) { // 写事件
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]); // 处理写
} else {
LOG_ERROR("Unexpected event");
}
}
}
}
void WebServer::SendError_(int fd, const char*info) {
assert(fd > 0);
int ret = send(fd, info, strlen(info), 0);
if(ret < 0) {
LOG_WARN("send error to client[%d] error!", fd);
}
close(fd);
}
void WebServer::CloseConn_(HttpConn* client) { // 关闭 http 连接
assert(client);
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());
client->Close();
}
void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);
if(timeoutMS_ > 0) {
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd])); //加入定时器 利用 bind 加 functional 实现回调函数 关闭连接
}
epoller_->AddFd(fd, EPOLLIN | connEvent_); // 在 epoll 中注册 客户端连接 读事件
SetFdNonblock(fd); // 设置文件描述符 非阻塞
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}
void WebServer::DealListen_() { // 处理监听 socket 上的事件
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if(fd <= 0) { return;}
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
} while(listenEvent_ & EPOLLET); // ET 模式要循环
}
void WebServer::DealRead_(HttpConn* client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client)); // 在线程池中加入读任务
}
void WebServer::DealWrite_(HttpConn* client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client)); // 线程池中加入写任务
}
void WebServer::ExtentTime_(HttpConn* client) {
assert(client);
if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); } // 调整定时器
}
void WebServer::OnRead_(HttpConn* client) { // 读取数据
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno);
if(ret <= 0 && readErrno != EAGAIN) {
CloseConn_(client);
return;
}
OnProcess(client);
}
void WebServer::OnProcess(HttpConn* client) {
if(client->process()) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT); // 处理成功 在 epoll 中加入写事件
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN); // 读缓冲区 没有内容 在 epoll 中注册读事件
}
}
void WebServer::OnWrite_(HttpConn* client) {
assert(client);
int ret = -1;
int writeErrno = 0;
ret = client->write(&writeErrno);
if(client->ToWriteBytes() == 0) {
/* 传输完成 */
if(client->IsKeepAlive()) {
OnProcess(client);
return;
}
}
else if(ret < 0) {
if(writeErrno == EAGAIN) {
/* 继续传输 */
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}
CloseConn_(client);
}
/* Create listenFd */
bool WebServer::InitSocket_() {
int ret;
struct sockaddr_in addr;
if(port_ > 65535 || port_ < 80) {
LOG_ERROR("Port:%d error!", port_);
return false;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 主机字节序 转 网络字节序
addr.sin_port = htons(port_); // 主机字节序 转 网络字节序
struct linger optLinger = { 0 }; // 优雅关闭连接结构体
if(openLinger_) {
/* 优雅关闭: 直到所剩数据发送完毕或超时 */
optLinger.l_onoff = 1;
optLinger.l_linger = 1;
}
listenFd_ = socket(AF_INET, SOCK_STREAM, 0); // 创建 sokcet 连接
if(listenFd_ < 0) {
LOG_ERROR("Create socket error!", port_);
return false;
}
ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger)); // 设置优雅关闭连接选项
if(ret < 0) {
close(listenFd_);
LOG_ERROR("Init linger error!", port_);
return false;
}
int optval = 1;
/* 端口复用 */
/* 只有最后一个套接字会正常接收数据。 */
ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int)); // 设置端口复用选项
if(ret == -1) {
LOG_ERROR("set socket setsockopt error !");
close(listenFd_);
return false;
}
ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr));
if(ret < 0) {
LOG_ERROR("Bind Port:%d error!", port_);
close(listenFd_);
return false;
}
ret = listen(listenFd_, 6);
if(ret < 0) {
LOG_ERROR("Listen port:%d error!", port_);
close(listenFd_);
return false;
}
ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN); // 监听连接
if(ret == 0) {
LOG_ERROR("Add listen error!");
close(listenFd_);
return false;
}
SetFdNonblock(listenFd_); // 设置文件描述符 非阻塞
LOG_INFO("Server port:%d", port_);
return true;
}
int WebServer::SetFdNonblock(int fd) {
assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
}