为了提供更灵活的输入输出控制,并让其支持更多的类型和格式,C++ 引入了输入输出流。C++ 的输入输出系统中提供了两个基类,分别是 ios_base
和 ios
。基于这两个基类实现了我们常用的标准输入输出流 istream
和 ostream
。同时,基于这两个流,C++ 提供了另外两种类型:文件输入输出流 fstream
以及字符串输入输出流 stringstream
。这些类之间的继承关系可以用下图来说明:
对于大多数 c++ 程序而言,使用系统提供的输入输出框架已经足够了。但是,对于想要根据需求改变流表现的使用者来说,了解如何定制流的过程至关重要。例如,你可能希望在你的项目中像标准输入输出一样来读取 tcp socket,或者希望像标准输入输出一样来封装一个对于 FILE* 的读取和写入,再或者你希望利用输入输出流的方式来操纵内存中的数据,这些都可以通过定制自己的流来实现。
我们都知道每一个输入输出流都会绑定相应的 buffer,也就是输入输出缓冲区。这个缓冲区就是基于上图中的 streambuf
类来定义的。可以看到,文件输入输出使用的是继承自 streambuf
的 filebuf
,而字符串流则是使用的 stringbuf
。事实上,streambuf 是输入输出系统中最关键的一部分,它负责提供缓冲功能,并提供”顺序读取设备”的抽象,也就是把数据刷新到外部设备中或者从外部设备中读取数据。而具体的流可以只负责进行格式化或者完成其他类型地工作。
现在假设我们有这样一个需求:用标准输入输出的方式来封装一个 TCP socket,也就是定义一个 BasicTcpStream,使其可以进行如下地操作:
std::string inputStr;
BasicTcpStream tcpInOut1(socket1);
BasicTcpStream tcpInOut2(socket2);
// 1. 从标准输入中接受字符串,然后写入到socket中
while(cin >> inputStr) {
tcpInOut1 << inputStr << endl;
}
// 2. 从一个socket中接收数据写入到另一个socket中
while(tcpInOut1 >> inputStr) {
tcpInOut2 << inputStr << endl;
}
注意这并不是一个无意义的需求,事实上 phxrpc 就实现了这个功能。为了达到目的,那么我们需要完成两步的工作:
- 按照需求自定义一个 buffer 类,该 buffer 一般需要继承自 streambuf,并且覆盖类中的部分虚函数实现。
- 定义一个相应的 stream 来使用该 buffer 类,也就是利用 buffer 类来同外部设备打交道,读取或者写入 socket。
自定义 streambuf
streambuf
是一个 traits class,由 basic_streambuf
所定义(具体什么是 traits class 以及为什么要这么定义 streambuf
,以后再说)。
template<
class CharT,
class Traits = std::char_traits<CharT>
> class basic_streambuf;
typedef basic_streambuf<char> streambuf;
typedef basic_streambuf<wchar_t> wstreambuf;
其中 streambuf 基于标准字符类型 char
,而 wstreambuf 基于宽字符类型 wchar_t
,这里我们的实例均基于 streambuf 所实现。streambuf 既定义了输出的操作也定义了输入操作,我们将分别介绍如何实现对于输入输出的定制。
用于输出的 streambuf
streambuf 使用三个指针来管理相应的输出缓冲区(缓冲区需要自行设置),分别由接口 pbase
,pptr
和 epptr
返回。其中 pbase
是缓冲区的基指针,指向缓冲区的第一个字节,epptr
是缓冲区的尾指针,指向其最后一个字节的下一个字节(类似于 iter.end()
的作用),而 pptr
指向缓冲区当前可用的位置,也就是pptr
之前都已经被数据所填充,如下图:
streambuf 定义的输出相关的函数主要有 sputc
和 sputn
,前者输出一个字符到缓冲区,并且将指针 pptr
向后移动一个字符,后者调用函数 xsputn
连续输出多个字符,xsputn
默认的实现就是多次调用 sputc
。由于缓冲区有限,当 pptr
指针向后移动满足 pptr() == epptr
时,说明缓冲区满了,这时将会调用函数 overflow
将数据写入到外部设备并清空缓冲区;清空缓冲区的方式则是调用 pbump
函数将指针 pptr
重置。我们可以通过如下的类来实现自定义的输出 buffer:
#include <iostream>
class TcpStreamBuf : public std::streambuf {
public:
TcpStreamBuf(int socket, size_t buf_size);
~TcpStreamBuf();
int overflow(int c); // 字符 c 是调用 overflow 时当前的字符
int sync(); // 将buffer中的内容刷新到外部设备,不管缓冲区是否满
private:
const size_t buf_size_;
int socket_;
char* pbuf_; // 输出缓冲区
};
我们在初始化时来申请 buffer 内存,并且通过 setp
函数来指定初始 pbase
以及 epptr
指针的位置:
TcpStreamBuf::TcpStreamBuf(int socket, size_t buf_size) :
buf_size_(buf_size), socket_(socket) {
assert(buf_size_ > 0);
pbuf_ = new char[buf_size_];
setp(pbuf_, pbuf_ + buf_size_); // set the pointers for output buf
}
// flush the data to the socket
int TcpStreamBuf::sync() {
int sent = 0;
int total = pptr() - pbase(); // data that can be flushed
while (sent < total) {
int ret = send(socket_, pbase()+sent, total-sent, 0);
if (ret > 0) sent += ret;
else {
return -1;
}
}
setp(pbase(), pbase() + buf_size_); // reset the buffer
pbump(0); // reset pptr to buffer head
return 0;
}
上面的构造函数和 sync 函数都比较容易理解。构造函数申请一块堆内存 pbuf
作为输出缓冲区,然后调用 setp
函数来设置 buffer 的头指针 pbase
和尾指针 epptr
。sync
函数强制将已经缓存的数据调用 send 发送出去,也就是刷新到外部设备。接下来我们看如何定义函数 overflow
。由于调用 overflow 时当前的缓冲区已经满了,因此 overflow 的参数 c 必须在缓冲区中的数据刷新到外部设备之后才能够放入到 buffer 中,否则 overflow 应该返回 eof。
int TcpStreamBuf::overflow(int c) {
if (-1 == sync()) {
return traits_type::eof();
}
else {
// put c into buffer after successful sync
if (!traits_type::eq_int_type(c, traits_type::eof())) {
sputc(traits_type::to_char_type(c));
}
// return eq_int_type(c, eof()) ? eof():c;
return traits_type::not_eof(c);
}
}
完成了上述这些步骤,我们基本上已经定义了一个可以用于输出的缓冲区 TcpStreamBuf
,接下来我们同样介绍一下,如何为该缓冲区类增加输入的功能。
用于输入的 streambuf
前面的需求中,希望 TcpStream 能够支持类似于 cin
的操作,也就是直接从 socket 中读取数据。这就要求我们定义的底层 TcpStreamBuf
需要支持输入操作。同管理输出缓冲区一样,streambuf 也使用三个指针,eback()
,gptr()
以及 egptr()
来指示输入缓冲区的开始字节,当前可用字节以及缓冲区尾的下一字节,如下图所示:
streambuf
类同样定义了如下几个函数来支持对于输入缓冲区的读取和管理:
sgetc
: 从输入缓冲区中读取一个字符;sbumpc
: 从输入缓冲区中读取一个字符,并将gptr()
指针向后移动一个位置;sgetn
: 从输入缓冲区中读取 n 个字符;sungetc
: 将缓冲区的gptr()
指针向前移动一个位置;sputbackc
: 将一个读取到的字符重新放回到输入缓冲区中;
与输出缓冲区不同的是,输入缓冲区需要额外提供 putback
操作,也就是将字符放回到输入缓冲区内。我们的 TcpStream 暂时不需要支持该功能,如果想了解如何添加 pytback
功能可以参考一下这里。
当输入缓冲区满足 gptr() == egptr()
时,表明缓冲区已经没有数据可以读取,函数 sgetc
将会调用 underflow 函数来从外部设备中拉取数据。不同于 sgetc
,sbumpc
在这种情况下则会调用 uflow
来实现拉取数据,并移动缓冲区读取指针的目的。默认情况下,uflow
会调用 underflow
,我们也无需额外实现 uflow
,但在特殊情况下(例如没有定义缓冲空间),则需要覆盖实现两个函数。
知道这些之后,我们就可以为 TcpStreamBuf 增加输入的功能。首先我们需要在构造时,为 TcpStreamBuf 申请一块空间用于输出缓冲区,并调用 setg
来设置相应的三个指针:
gbuf_ = new char[buf_size_];
setg(gbuf_, gbuf_, gbuf_);
需要注意的是,setg
比 setp
多一个参数,需要同时设置三个指针的指向位置。接下来我们需要定义当缓冲区已经没有数据时需要进行的操作,也就是 underflow
函数:
int TcpStreamBuf::underflow() {
int ret = recv(socket_, eback(), buf_size_, 0);
if (ret > 0) {
setg(eback(), eback(), eback() + ret);
return traits_type::to_int_type(*gptr());
} else {
return traits_type::eof();
}
}
当缓冲区没有数据时,函数 underflow
将直接从 socket 中读取数据到 gbuf_
中,然后设置尾指针为 eback() + ret
,设置 gptr
为 指向数据的第一个字节 eback
。同时返回当前可以读取的位置上的数据 *gptr()
。
上述的函实现已经基本满足一个可以用于读取写入 TCP socket 的 streambuf,接下来我们介绍第二步,也就是定义一个 stream 来使用 TcpStreamBuf。
自定义 stream
自定义的类 BasicTcpStream
需要继承于类 iostream
,并且将 TcpStreamBuf
作为底层的缓冲区使用:
class BasicTcpStream : public std::iostream {
public:
BasicTcpStream(int socket, size_t buf_size):
iostream(new TcpStreamBuf(socket, buf_size),
socket_(socket), buf_size_(buf_size) {
}
~BasicTcpStream();
private:
int socket_;
const size_t buf_size_;
};
我们决定不再为这个类增加更多的内容,仅仅将其作为一个简单的包装类来测试一下 TcpStreamBuf 的使用。我们首先需要编写一个简单的 client 和 server 来建立起 tcp 链接,然后通过类似于标准输入输出的方式来实现对于 socket 的写入和读取,其中 server.c 的写入代码如下:
for (;;) {
int clientfd = -1;
struct sockaddr_in addr;
socklen_t socklen = sizeof(addr);
clientfd = accept(sockfd, (struct sockaddr*) &addr, &socklen);
if (clientfd >= 0) {
tcpstream::BasicTcpStream tcpInOut(clientfd, BUF_SIZE);
tcpInOut << "Hello World" << std::endl;
for(char c = 'j'; c <= 'q'; c++) {
tcpInOut << c << std::endl;
}
int rc;
while(tcpInOut >> rc) {
std::cout << char(rc) << std::endl;
}
}
}
对于每一个接入的客户端,首先写回一个 HelloWorld,然后从字母 j
到字母 q
逐个写入,之后接受从客户端发过来的字符。客户端的代码如下:
tcpstream::BasicTcpStream tcpInOut(sockfd, BUF_SIZE);
char line[64] = { 0 };
if(tcpInOut.getline(line, 64).good()) {
std::cout << line << std::endl;
}
else {
std::cout << "receive error: " << line << std::endl;
}
char c;
while(tcpInOut >> c) {
std::cout << c << std::endl;
tcpInOut << std::toupper(c) << std::endl;
if(c == 'q') {
close(sockfd);
break;
}
}
也就是将服务端发送过来的字母全部转为大写并发送回去。全部代码在 custom_streambuf 可以找到(在 MacOS 下编译测试)。