网络编程基础
Socket 概述
Socket(套接字)是应用程序与网络协议栈之间的接口。Unix 的设计哲学是「一切皆文件」,Socket 也不例外——它是一种特殊的文件,通过文件描述符(fd)来操作。
Socket 类型
| 类型 | 说明 | 对应协议 |
|---|---|---|
| SOCK_STREAM | 面向字节流的可靠连接 | TCP |
| SOCK_DGRAM | 无连接的报文服务 | UDP |
| SOCK_RAW | 原始套接字,绕过传输层 | IP/ICMP |
地址结构
c
// IPv4 地址结构
struct sockaddr_in {
sa_family_t sin_family; // AF_INET
in_port_t sin_port; // 端口号(网络字节序)
struct in_addr sin_addr; // IP 地址
char sin_zero[8]; // 填充对齐
};
struct in_addr {
uint32_t s_addr; // IPv4 地址(网络字节序)
};
// 通用地址结构(用于函数参数)
struct sockaddr {
sa_family_t sin_family;
char sa_data[14];
};1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python
# Python 中使用 tuple 表示地址
# (host, port)
# host: IP 地址字符串或域名
# port: 端口号(整数)
# IPv6 示例
# (host, port, flowinfo, scopeid)1
2
3
4
5
6
7
2
3
4
5
6
7
TCP 服务器与客户端
TCP 服务器模板
python
import socket
import threading
def handle_client(conn, addr):
"""处理单个客户端连接"""
print(f"[+] 客户端连接: {addr}")
try:
while True:
data = conn.recv(1024)
if not data:
break # 客户端关闭连接
print(f"[{addr}] 收到: {data.decode()}")
conn.sendall(data) # 回显数据
except ConnectionResetError:
print(f"[-] 客户端异常断开: {addr}")
finally:
conn.close()
print(f"[-] 连接关闭: {addr}")
def tcp_server(host='0.0.0.0', port=8888):
"""TCP 服务器"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen(128) # backlog 队列长度
print(f"[*] TCP 服务器监听 {host}:{port}")
try:
while True:
conn, addr = server_sock.accept() # 阻塞等待连接
thread = threading.Thread(target=handle_client, args=(conn, addr))
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("\n[*] 服务器关闭")
finally:
server_sock.close()
if __name__ == '__main__':
tcp_server()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
TCP 客户端模板
python
import socket
def tcp_client(host='127.0.0.1', port=8888):
"""TCP 客户端"""
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.connect((host, port)) # 连接服务器
print(f"[*] 已连接服务器 {host}:{port}")
try:
while True:
msg = input("> ")
if msg.lower() in ('quit', 'exit'):
break
client_sock.sendall(msg.encode())
data = client_sock.recv(1024)
if data:
print(f"[服务器]: {data.decode()}")
except (ConnectionResetError, BrokenPipeError):
print("[-] 服务器已断开")
finally:
client_sock.close()
print("[*] 连接已关闭")
if __name__ == '__main__':
tcp_client()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
TCP 进阶:非阻塞与 IO 多路复用
python
import socket
import select
def tcp_server_select(host='0.0.0.0', port=8888):
"""使用 select 实现 IO 多路复用"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen(128)
server_sock.setblocking(False) # 非阻塞模式
inputs = [server_sock]
outputs = []
message_queues = {}
print(f"[*] 服务器监听 {host}:{port} (select)")
try:
while inputs:
readable, writable, exceptional = select.select(
inputs, outputs, inputs
)
for sock in readable:
if sock is server_sock:
# 新连接
conn, addr = sock.accept()
conn.setblocking(False)
inputs.append(conn)
message_queues[conn] = []
print(f"[+] 新连接: {addr}")
else:
# 客户端数据
try:
data = sock.recv(1024)
if data:
print(f"[{sock.getpeername()}] 收到: {data.decode()}")
message_queues[sock].append(data)
if sock not in outputs:
outputs.append(sock)
else:
# 客户端关闭
print(f"[-] 客户端断开: {sock.getpeername()}")
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
del message_queues[sock]
sock.close()
except ConnectionResetError:
pass
for sock in writable:
if message_queues.get(sock):
msg = message_queues[sock].pop(0)
try:
sock.sendall(msg)
except BrokenPipeError:
inputs.remove(sock)
outputs.remove(sock)
sock.close()
for sock in exceptional:
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
sock.close()
del message_queues[sock]
finally:
server_sock.close()
# 更现代的方式:使用 selectors 模块(Python 3.4+)
import selectors
def tcp_server_selector(host='0.0.0.0', port=8888):
"""使用 selectors 实现 IO 多路复用"""
sel = selectors.DefaultSelector()
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen(128)
server_sock.setblocking(False)
def accept(sock, mask):
conn, addr = sock.accept()
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
try:
data = conn.recv(1024)
if data:
conn.sendall(data)
else:
sel.unregister(conn)
conn.close()
except ConnectionResetError:
sel.unregister(conn)
conn.close()
sel.register(server_sock, selectors.EVENT_READ, accept)
print(f"[*] 服务器监听 {host}:{port} (selectors)")
try:
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
print("\n[*] 服务器关闭")
finally:
server_sock.close()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
UDP 服务器与客户端
UDP 服务器
python
import socket
def udp_server(host='0.0.0.0', port=8888):
"""UDP 服务器"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
print(f"[*] UDP 服务器监听 {host}:{port}")
try:
while True:
data, addr = sock.recvfrom(4096) # 接收数据和发送者地址
print(f"[{addr}] 收到: {data.decode()}")
sock.sendto(f"回显: {data.decode()}".encode(), addr)
except KeyboardInterrupt:
print("\n[*] 服务器关闭")
finally:
sock.close()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
UDP 客户端
python
import socket
def udp_client(host='127.0.0.1', port=8888):
"""UDP 客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5.0) # 设置超时
try:
while True:
msg = input("> ")
if msg.lower() in ('quit', 'exit'):
break
sock.sendto(msg.encode(), (host, port))
data, server = sock.recvfrom(4096)
print(f"[服务器]: {data.decode()}")
except socket.timeout:
print("[-] 请求超时")
except (ConnectionRefusedError, OSError) as e:
print(f"[-] 连接错误: {e}")
finally:
sock.close()
print("[*] 客户端关闭")1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
粘包处理
什么是粘包?
TCP 是面向字节流的协议,不保留消息边界。多个小数据包可能被合并成一个 TCP 段发送(粘包),或一个大数据包被拆分成多个 TCP 段(拆包)。应用层需要自行处理消息边界。
发送: "Hello" + "World" (两条消息)
接收: "HelloWorld" (粘包,分不清边界)1
2
2
解决方案
方案1:固定长度
所有消息都是固定长度,不足部分用空格或 \0 填充。
python
MSG_SIZE = 1024
def send_fixed(sock, msg: str):
"""发送固定长度的消息"""
msg_bytes = msg.encode().ljust(MSG_SIZE, b'\0')
sock.sendall(msg_bytes)
def recv_fixed(sock) -> str:
"""接收固定长度的消息"""
data = b''
while len(data) < MSG_SIZE:
chunk = sock.recv(MSG_SIZE - len(data))
if not chunk:
break
data += chunk
return data.rstrip(b'\0').decode()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
方案2:固定头部 + 变长数据(推荐)
python
import struct
def send_message(sock, msg: str):
"""发送带长度前缀的消息"""
msg_bytes = msg.encode('utf-8')
# 4字节 unsigned int (网络字节序) 作为长度
header = struct.pack('!I', len(msg_bytes))
sock.sendall(header + msg_bytes)
def recv_message(sock) -> str:
"""接收带长度前缀的消息"""
# 先接收 4 字节头部
header = b''
while len(header) < 4:
chunk = sock.recv(4 - len(header))
if not chunk:
return None
header += chunk
length = struct.unpack('!I', header)[0]
# 再接收 body
body = b''
while len(body) < length:
chunk = sock.recv(length - len(body))
if not chunk:
return None
body += chunk
return body.decode('utf-8')
# 测试粘包处理
def tcp_echo_server(host='0.0.0.0', port=9999):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(5)
print(f"[*] Echo Server 监听 {host}:{port}")
conn, addr = sock.accept()
print(f"[+] 客户端: {addr}")
while True:
msg = recv_message(conn)
if msg is None:
print("[-] 客户端断开")
break
print(f"[收到]: {msg}")
send_message(conn, msg)
conn.close()
sock.close()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
方案3:特殊分隔符
适用于文本协议,用 \n 或 \r\n 作为消息分隔符。
python
def send_line(sock, msg: str):
"""发送一行消息(以换行符结尾)"""
sock.sendall((msg + '\n').encode('utf-8'))
def recv_line(sock) -> str:
"""接收一行消息"""
buffer = b''
while True:
chunk = sock.recv(1)
if chunk == b'\n' or not chunk:
break
buffer += chunk
return buffer.decode('utf-8').rstrip('\r')1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
边界问题的根本原因
TCP 粘包/拆包的根源在于协议分层:
- 应用层:关注完整消息(如 HTTP 请求/响应)
- 传输层:关注字节流,不关心消息边界
- 网络层:关注 IP 包,也不关心消息边界
解决方案必须在应用层实现,因为只有应用层知道完整的消息格式。
完整示例:高性能 TCP 服务器
python
import socket
import selectors
import struct
import threading
class ByteBuffer:
"""字节缓冲区,用于处理粘包"""
def __init__(self):
self._data = b''
def append(self, data: bytes):
self._data += data
def read(self, n: int) -> bytes:
if len(self._data) < n:
return b''
result = self._data[:n]
self._data = self._data[n:]
return result
def read_until(self, delim: bytes) -> bytes:
"""读取直到遇到分隔符"""
idx = self._data.find(delim)
if idx == -1:
return b''
result = self._data[:idx]
self._data = self._data[idx + len(delim):]
return result
def __len__(self):
return len(self._data)
class Protocol:
"""自定义协议:4字节头部长度 + 变长数据"""
@staticmethod
def pack(msg: str) -> bytes:
body = msg.encode('utf-8')
return struct.pack('!I', len(body)) + body
@staticmethod
def unpack_header(data: bytes):
if len(data) < 4:
return None, None
length = struct.unpack('!I', data[:4])[0]
body = data[4:]
return length, body
class HighPerformanceTCPServer:
def __init__(self, host='0.0.0.0', port=8888, workers=4):
self.host = host
self.port = port
self.workers = workers
self.selector = selectors.DefaultSelector()
self.buffers = {} # fd -> ByteBuffer
self.lock = threading.Lock()
def start(self):
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((self.host, self.port))
server_sock.listen(256)
server_sock.setblocking(False)
self.selector.register(server_sock, selectors.EVENT_READ, self.accept)
print(f"[*] 高性能 TCP 服务器启动 {self.host}:{self.port}")
print(f"[*] Workers: {self.workers}")
self._accept_loop()
def accept(self, sock, mask):
conn, addr = sock.accept()
conn.setblocking(False)
with self.lock:
self.buffers[conn.fileno()] = ByteBuffer()
self.selector.register(conn, selectors.EVENT_READ, self.read)
def read(self, conn, mask):
try:
data = conn.recv(4096)
if not data:
self._close(conn)
return
with self.lock:
buf = self.buffers.get(conn.fileno())
if not buf:
return
buf.append(data)
self._process_buffer(conn)
except Exception as e:
print(f"[-] 读取错误: {e}")
self._close(conn)
def _process_buffer(self, conn):
with self.lock:
buf = self.buffers.get(conn.fileno())
if not buf:
return
length, body = Protocol.unpack_header(buf._data)
if length is None:
return
if len(body) < length:
return
# 完整消息到达
message = body[:length].decode('utf-8')
self._handle_message(conn, message)
# 保留剩余数据
buf._data = buf._data[4 + length:]
# 递归处理剩余数据(可能有多条消息粘在一起)
if len(buf._data) >= 4:
self._process_buffer(conn)
def _handle_message(self, conn, message: str):
"""处理消息(可被子类重写)"""
print(f"[{conn.getpeername()}] {message}")
# 回显
conn.sendall(Protocol.pack(f"Echo: {message}"))
def _close(self, conn):
with self.lock:
self.buffers.pop(conn.fileno(), None)
try:
self.selector.unregister(conn)
conn.close()
except:
pass
def _accept_loop(self):
try:
while True:
events = self.selector.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
pass
finally:
self.selector.close()
if __name__ == '__main__':
server = HighPerformanceTCPServer(port=8888)
server.start()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
常见问题与调试
查看端口占用
bash
# Linux: 查看端口占用
ss -tlnp | grep 8888
netstat -tlnp | grep 8888
# macOS
lsof -i :88881
2
3
4
5
6
2
3
4
5
6
TCP 保活机制
python
# 开启 TCP Keep-Alive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# 设置保活参数(Linux)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) # 空闲 60s 后开始探测
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10) # 探测间隔 10s
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3) # 最多探测 3 次1
2
3
4
5
6
7
2
3
4
5
6
7
超时设置
python
# 设置读取超时
sock.settimeout(30.0)
# 设置连接超时
sock.settimeout(5.0)
sock.connect((host, port))1
2
3
4
5
6
2
3
4
5
6
[[返回 网络首页|../index]]