Zookeeper源码阅读之五(FileTxnSnapLog)

it2024-11-21  14

本次阅读的zookeeper的版本是3.4.19 

zookeeper的数据的持久化主要是依赖两种文件,第一种是snapshot快照文件,一种是log日志文件。对于snapshot文件存储的是这个DataTree和session内存的快照,对于log文件则是顺序存储的事务日志。

下面图展示了这两种文件的格式,可以看到这两种文件的后缀都是一个16进制数,对于snapshot文件表示的是在开始这个快照时其对应的最后一个执行的事务id,这表示在做恢复时这个事务id之后的事务都需要重新执行,对于log文件来说这个16进制表示的则是这个日志的第一条事务日志对应的事务id。

这两种的日志文件是利用FileTxnSnapLog来进行维护的。

FileTxnSnapLog维护的属性

看到这个数据结构就可以发现,FileTxnSnapLog日志的处理逻辑其实都会交给TxnLog和SnapShot这两个两种日志的处理类来进行处理。

public class FileTxnSnapLog { private final File dataDir;//事务日志目录 private final File snapDir;//快照日志目录 private TxnLog txnLog;//事务日志处理对象 private SnapShot snapLog;//快照日志处理对象 }

TxnLog

先介绍一下TxnLog,它是一个接口,下面展示的是其提供的一些方法及其相关的一些作用。

public interface TxnLog { //一般情况下是创建一个新的文件来存储数据 void rollLog() throws IOException; //记录事务日志 boolean append(TxnHeader hdr, Record r) throws IOException; //获取一个从zxid的事务日志迭代器,这个主要是用于leader和follower同步,事务日志恢复这些 TxnIterator read(long zxid) throws IOException; //获取的是最后一条事务日志的zxid long getLastLoggedZxid() throws IOException; //将事务日志截断到对应的zxid,主要是在与leader进行同步时的操作 boolean truncate(long zxid) throws IOException; //对应的dbid long getDbId() throws IOException; //提交日志,对于文件主要是进行文件的flush操作将数据落盘 void commit() throws IOException; //关闭这个日志 void close() throws IOException; //日志迭代器接口 public interface TxnIterator { //获取当前事务日志头 TxnHeader getHeader(); //获取当前的事务日志 Record getTxn(); //下一条日志 boolean next() throws IOException; //关闭这个迭代器 void close() throws IOException; } }

FileTxnLog

fileTxnLog是对TxnLog这个接口的默认的实现,其对应的事务日志是通过File来进行存储的,下面图展示了一个事务日志文件存储的对应的事务日志的格式。

可以看到其主要分为三块,分别为FileHeader,TxnList和ZeroPad

FileHeader:  文件头,主要存储的是魔数,版本以及dbId等信息TxnList:这是事务日志主要存储的内容,存储的是一条一条事务日志ZeroPad: 前面可以看到每个事务日志文件是固定的64M的大小,对于还没有写到的数据则是用0来填充

下面详细讲一下Txn对应的数据结构,Txn则主要是通过checksum,txnLen,TxnHeader,Record和结束标识0x42来表示的

checksum: 校验和,主要是用来验证包的完整性txnLen: 整个TxnHeader,Record的数据长度TxnHeader: 表示的事务头相关的信息Record: 表示的是这个事务的操作数据SetDataRequest,DeleteRequest这些0x42: 结束标识

append方法

append方法是SyncRequestProcessor调用的方法,主要是追加一条事务日志到文件中,不过这个方法没有进行flush操作,这个flush操作主要是在commit方法中进行,commit主要是将数据从缓冲刷到磁盘中,实现真正的持久化。

public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException{ if (hdr != null) { if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) {//已经进行了rolllog logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader");//序列化fileHeader logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); } padFile(fos);//将文件填充到64M byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);//数据写入到logStream return true; } return false; }

commit方法

commit方法是将数据从内存缓冲写入到磁盘中,只有调用了这个方法并且forceSync设置为true的情况下才能保证其日志真正地写入磁盘中。这个方法一般情况下也是SyncRequestProcessor 来进行调用的,主要是在一系列地append操作后再进行commit操作,因为commit操作一般情况下比较慢,所以在多次append操作后再进行commit以提升效率。

这个操作主要是先对当前的logStream进行flush操作,这个logStream是一个BufferedOutputStream,其flush主要是将其内存缓存写入到其对应的FileOutputStream中,不过这个操作只是将数据写入到操作系统,不能保证操作系统将数据写入到磁盘中,所以有下面的循环地flush操作,这个循环中先调用的是FileOutputStream的flush,这个操作内部没有进行工作,是下面的log.getChannel().force(false)保证了数据写入到磁盘中。

public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush(); } for (FileOutputStream log : streamsToFlush) { log.flush();//这里操作其实没什么具体操作 if (forceSync) { long startSyncNS = System.nanoTime(); log.getChannel().force(false);//这里操作主要是将数据从操作系统缓冲刷到磁盘中 long syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { } } } while (streamsToFlush.size() > 1) {//这里只留下一个文件流,即当前正写入的文件 streamsToFlush.removeFirst().close(); } }

rolllog方法

这个方法主要是将关闭当前的文件流,让append方法在追加新的事务日志时创建一个新的日志文件,可以看到它只是进行了flush操作(即将数据从内存中交给操作系统进行数据写入),具体地强制刷新则是在commit操作中进行的。

这个方法一般是在SyncRequestProcessor中进行的,一般是单个文件的事务日志文件写入了一定数量之后会进行rollLog并且将当前的内存DataTree生成快照。

public synchronized void rollLog() throws IOException { if (logStream != null) { this.logStream.flush(); //当前的BufferdOutputStream将数据从内存缓存写入到操作系统中 this.logStream = null; oa = null; } }

FileTxnIterator

这个类是一个Txn的迭代器,主要是用来迭代某个zxid之后事务日志的,主要是对于数据恢复用来读取快照之后的所有事务日志,以及follower同步leader时读取对应的follower的zxid之后的事务日志,以及在follower同步leader时可能需要进行的truncate截断操作。这个FileTxnIterator主要是维护了对应目录下的所有的文件,然后通过顺序读取文件中的事务日志来进行迭代操作。

FileTxnIterator#ini

这个方法是迭代器的初始化方法,主要是获取对应目录下的对应的事务日志文件,并且将其排序并去除掉对应日志文件的最后一条事务日志大于这个迭代器开始的zxid,然后走到第一个大于等于对应zxid的文件流位置。

void init() throws IOException { storedFiles = new ArrayList<File>(); List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false); for (File f: files) { if (Util.getZxidFromName(f.getName(), "log") >= zxid) { storedFiles.add(f); } else if (Util.getZxidFromName(f.getName(), "log") < zxid) { storedFiles.add(f); break; } } goToNextLog();//将storedFiles中的最后的一个文件作为当前的文件输入流 if (!next())//读取下一个事务日志 return; while (hdr.getZxid() < zxid) {//读取事务日志到对应的zxid if (!next()) return; } }

FileTxnIterator#next

next方法是读取下一条事务日志,主要是在当前文件读完后切换到下一个文件以及做crc检查的操作

public boolean next() throws IOException { if (ia == null) { return false; } try { long crcValue = ia.readLong("crcvalue"); byte[] bytes = Util.readTxnBytes(ia); if (bytes == null || bytes.length==0) { throw new EOFException("Failed to read " + logFile); } Checksum crc = makeChecksumAlgorithm(); crc.update(bytes, 0, bytes.length);//crc检查 if (crcValue != crc.getValue()) throw new IOException(CRC_ERROR); if (bytes == null || bytes.length == 0) return false; hdr = new TxnHeader(); record = SerializeUtils.deserializeTxn(bytes, hdr); } catch (EOFException e) {//没有数据则准备切换到下一个文件 inputStream.close(); inputStream = null; ia = null; hdr = null; if (!goToNextLog()) {//切换到下一个文件 return false; } return next();//读取下一条数据 } catch (IOException e) { inputStream.close(); throw e; } return true; }

SnapShot

snapshot也是一个接口,主要是序列化和反序列化的方法。

public interface SnapShot { long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;//反序列化 void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException; //序列化 File findMostRecentSnapshot() throws IOException;//找最近的快照 void close() throws IOException; }

FileTxnSnapLog

快照的数据主要存储的是fileHeader,session和tree这三块数据,其中FileHeader和事务日志差不多,主要就是magic,version,dbId这些,session主要存储的是当前的session,最后的tree存储的则是acls和节点数据,DataTree中的acl是用一个aclMap来进行映射的,所以在序列化时会将这个映射文件进行存储,然后存储的则是path和node对,其中path是绝对路径。

 

最新回复(0)