MyDB-开源项目-笔记

[toc]

零、资料


一、MyDB 概述

MYDB 是一个 Java 实现的简单的数据库,部分原理参照自 MySQL、PostgreSQL 和 SQLite。实现了以下功能:

  • 数据的可靠性和数据恢复
  • 两段锁协议(2PL)实现可串行化调度
  • MVCC
  • 两种事务隔离级别(读提交和可重复读)
  • 死锁处理
  • 简单的表和字段管理
  • 简陋的 SQL 解析(因为懒得写词法分析和自动机,就弄得比较简陋)
  • 基于 socket 的 server 和 client

1.1、整体结构

  • 前端:读取输入,发给后端执行。
  • 后端:分析、执行。
  • 前后端的交互:socket

五个模块:

  • Transaction Manager(TM) :通过 XID 文件维护事务状态,提供查询某个事务状态的接口。
  • Data Manager(DM):分页管理 DB 文件和日志文件,并进行缓存,抽取 DataItem 供上层调用,提供缓存。
  • Version Manager(VM):基于两段锁协议 实现了调度序列的 可串行化,并实现了 MVCC 以消除读写阻塞。同时实现了两种隔离级别
  • Index Manager(IM):基于 B+ 树的索引,目前 where 只支持已索引字段。
  • Table Manager(TBM):对字段和表的管理,解析 SQL 语句,并根据语句操作表。

模块的实现顺序是: TM -> DM -> VM -> IM -> TBM

即:由下向上。

模块依赖图

1.2、代码运行

(1)在 pom.xml 中调整编译版本,如果导入 IDE,则更改项目的编译版本以适应你的 JDK

(2)编译源码:mvn compile

(3)在指定的路径/tmp/mydb下创建数据库:

1
mvn exec:java -Dexec.mainClass="top.guoziyang.mydb.backend.Launcher" -Dexec.args="-create /tmp/mydb"

(4)以默认参数启动数据库服务(启动在本机的 9999 端口):

1
mvn exec:java -Dexec.mainClass="top.guoziyang.mydb.backend.Launcher" -Dexec.args="-open /tmp/mydb"

(5)启动客户端连接数据库:

1
mvn exec:java -Dexec.mainClass="top.guoziyang.mydb.client.Launcher"

二、事务管理器 TM 的实现

2.1、XID 文件-简介

事务管理器TM,是通过 XID 文件 来维护事务状态的。

即:所有的 XID 都存放在XID 文件

XID 文件的规则:

  1. 在 MyDB 项目中,每个事务都有一个 XID 作为唯一标识
  2. 事务管理器 TM 维护了一个存放了所有 XID 的文件,用于记录事务状态。
  3. XID 0时,表示可以在没有申请事务时使用,XID 0 的事务的状态永远是 committed
  4. XID 正常从1开始,自增。
  5. XID 的存放位置:第(xid-1)+8字节

事务的状态:

  • active,正在进行,尚未结束。

  • committed,已提交。

  • aborted,已撤销(回滚)。

XID 文件的结构

2.2、事务管理器的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package backend.tm;

/**
* @author cyw
*/
public interface TransactionManager {

/**
* 开始事务
*
* @return
*/
long begin();

/**
* 提交事务
*
* @param xid
*/
void commit(long xid);

/**
* 回滚事务
*
* @param xid
*/
void abort(long xid);

/**
* 取消事务
*
* @return
*/
void close();

/**
* 判断是否开启事务
* @param xid
* @return
*/
boolean isActive(long xid);

/**
* 判断是否提交事务
* @param xid
* @return
*/
boolean isCommitted(long xid);

/**
* 判断是否回滚事务
* @param xid
* @return
*/
boolean isAborted(long xid);


/**
* 创建一个 xid 文件并创建 TM
* @param xid
* @return
*/
public static TransactionManagerImpl create(String path) {
File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
try {
if(!f.createNewFile()) {
Panic.panic(Error.FileExistsException);
}
} catch (Exception e) {
Panic.panic(e);
}
if(!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}

// 写空XID文件头
ByteBuffer buf = ByteBuffer.wrap(new byte[TransactionManagerImpl.LEN_XID_HEADER_LENGTH]);
try {
fc.position(0);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}

return new TransactionManagerImpl(raf, fc);
}


/**
* 从一个已有的 xid文件 来创建 TM
* @param path
* @return
*/
public static TransactionManagerImpl open(String path) {
File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
if(!f.exists()) {
Panic.panic(Error.FileNotExistsException);
}
if(!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}

return new TransactionManagerImpl(raf, fc);
}

}

2.3、事务管理器的接口实现类

(1)常量配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* XID文件头长度 : 8 Byte
*/
private static final int LEN_XID_HEADER_LENGTH = 8;

/**
* 每个事务的大小:1 Byte
*/
private static final int XID_FIELD_SIZE = 1;

/**
* 事务的三个状态:激活、已提交、已取消
*/
private static final byte FIELD_TRAN_ACTIVE = 0;
private static final byte FIELD_TRAN_COMMITTED = 1;
private static final byte FIELD_TRAN_ABORTED = 2;


/**
* 超级事务 XID 0,状态永远为已提交
*/
public static final long SUPER_XID = 0;

/**
* XID 文件的后缀名
*/
static final String XID_SUFFIX = ".xid";

(2)异常类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package common;

/**
* @author cyw
*/
public class Error {
/**
* common
*/
public static final Exception CacheFullException = new RuntimeException("缓存已满!");
public static final Exception FileExistsException = new RuntimeException("文件已存在!");
public static final Exception FileNotExistsException = new RuntimeException("文件不存在!");
public static final Exception FileCannotRWException = new RuntimeException("文件不可读写!");

/**
* tm
*/
public static final Exception BadXIDFileException = new RuntimeException("错误的 XID 文件!");


}

(3)工具类的代码:

异常处理-工具类:

1
2
3
4
5
6
7
// 发生异常时退出
public class Panic {
public static void panic(Exception err) {
err.printStackTrace();
System.exit(1);
}
}

数据类型解析-工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package backend.utils;

import java.nio.ByteBuffer;

/**
* @author cyw
*/
public class Parser {

// 解析为 long 类型
public static long parse2Long(byte[] buf) {
ByteBuffer buffer = ByteBuffer.wrap(buf, 0, 8);
return buffer.getLong();
}

}

(4)成员变量的代码:

1
2
3
4
5
6
7
8
// import java.io.RandomAccessFile;
// import java.nio.channels.FileChannel;
// import java.util.concurrent.locks.Lock;

private RandomAccessFile file;
private FileChannel fc;
private long xidCounter;
private Lock counterLock;

(5)获取当前事务在XID文件中的存储位置 :

1
2
3
4
5
6
7
/**
* 根据事务 xid 取得其在 xid文件中对应的位置
* @return xid
*/
private long getXidPosition(long xid) {
return LEN_XID_HEADER_LENGTH + (xid-1)*XID_FIELD_SIZE;
}

(6)检查 XID 文件头(事务个数):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 检查 XID 文件是否合法
* 读取 XID_FILE_HEADER中的 xidCounter,根据它计算文件的理论长度,对比实际长度
*/
private void checkXIDCounter() {
long fileLen = 0;
try {
fileLen = file.length();
} catch (IOException e) {
Panic.panic(MyError.BadXIDFileException);
}

if(fileLen < LEN_XID_HEADER_LENGTH) {
Panic.panic(MyError.BadXIDFileException);
}

ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
try {
fc.position(0);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
// 读取xid文件的文件头部(前8字节存储了xid的个数,即事务个数)
this.xidCounter = Parser.parse2Long(buf.array());
// 找到最后一个事务的xid的存储位置
long end = getXidPosition(this.xidCounter + 1);
if(end != fileLen) {
Panic.panic(MyError.BadXIDFileException);
}
}

(7)检查事务的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 检测XID事务是否处于status状态
private boolean checkXID(long xid, byte status) {
long offset = getXidPosition(xid);
ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
try {
fc.position(offset);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
return buf.array()[0] == status;
}

@Override
public boolean isActive(long xid) {
if(xid == SUPER_XID) return false;
return checkXID(xid, FIELD_TRAN_ACTIVE);
}

@Override
public boolean isCommitted(long xid) {
if(xid == SUPER_XID) return true;
return checkXID(xid, FIELD_TRAN_COMMITTED);
}

@Override
public boolean isAborted(long xid) {
if(xid == SUPER_XID) return false;
return checkXID(xid, FIELD_TRAN_ABORTED);
}

(8)开启事务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 开始一个事务,并返回XID
public long begin() {
counterLock.lock();
try {
long xid = xidCounter + 1;
updateXID(xid, FIELD_TRAN_ACTIVE);
incrXIDCounter();
return xid;
} finally {
counterLock.unlock();
}
}

// 更新xid事务的状态为status
private void updateXID(long xid, byte status) {
long offset = getXidPosition(xid);
byte[] tmp = new byte[XID_FIELD_SIZE];
tmp[0] = status;
ByteBuffer buf = ByteBuffer.wrap(tmp);
try {
fc.position(offset);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

// 将XID加一,并更新XID Header
private void incrXIDCounter() {
xidCounter ++;
ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
try {
fc.position(0);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

(9)提交事务:

1
2
3
4
@Override
public void commit(long xid) {
updateXID(xid, FIELD_TRAN_COMMITTED);
}

(10)回滚事务:

1
2
3
4
@Override
public void abort(long xid) {
updateXID(xid, FIELD_TRAN_ABORTED);
}

(11)关闭事务:

1
2
3
4
@Override
public void abort(long xid) {
updateXID(xid, FIELD_TRAN_ABORTED);
}

三、数据管理器 DM 的实现