Mercurial > hg > hg-fastimport
diff hgext3rd/fastimport/hgimport.py @ 95:3b398a887b95
Use a sqlite3 database to store the blob data if available
This is much more performant than using a filesystem when we are
dealing with a large number of blobs.
If sqlite3 is not available, then fallback to writing to the filesystem.
In both cases, the blob data is compressed before writing to save space.
A new option has also been added to specify a path for persistent blob data.
This is only really important for large continuous interations where the
source data has no concept of export marks and thus only gets bigger.
What we gain here is a reduction in the write load on the disk.
| author | Roy Marples <roy@marples.name> |
|---|---|
| date | Thu, 21 Jan 2021 23:59:21 +0000 |
| parents | 2ce33511de87 |
| children | cde0e1d24e58 |
line wrap: on
line diff
--- a/hgext3rd/fastimport/hgimport.py Thu Jan 21 22:52:55 2021 +0000 +++ b/hgext3rd/fastimport/hgimport.py Thu Jan 21 23:59:21 2021 +0000 @@ -25,20 +25,31 @@ import stat import sys +from contextlib import closing + from hgext.convert import common, hg as converthg -from mercurial import util +from mercurial import util, zstd from mercurial.i18n import _ from .vendor.python_fastimport import processor, parser +# When dealing with many blobs, a database is more efficient +# than a filesystem. However, it's not necessary for us to work. +try: + import sqlite3 + have_sqlite = True +except (ImportError): + have_sqlite = False + + class fastimport_source(common.converter_source): """Interface between the fastimport processor below and Mercurial's normal conversion infrastructure. """ - def __init__(self, ui, repotype, repo, sources): + def __init__(self, ui, repotype, repo, sources, blobpath): self.ui = ui self.sources = sources - self.processor = HgImportProcessor(ui, repo) + self.processor = HgImportProcessor(ui, repo, blobpath) self.parsed = False self.repotype = repotype @@ -129,7 +140,7 @@ tagprefix = b"refs/tags/" - def __init__(self, ui, repo): + def __init__(self, ui, repo, blobpath): super(HgImportProcessor, self).__init__() self.ui = ui self.repo = repo @@ -145,7 +156,15 @@ self.tags = [] # list of (tag, mark) tuples self.numblobs = 0 # for progress reporting - self.blobdir = None + if blobpath: + self.blobpath = blobpath + self.blobpersist = True + else: + self.blobpath = os.path.join(self.repo.root, b".hg", b"blobs") + self.blobpersist = False + self.blobdb = None + self.compressor = zstd.ZstdCompressor(level = 6) # compress the blob + self.decompressor = zstd.ZstdDecompressor() # decompress the blob def setup(self): """Setup before processing any streams.""" @@ -153,9 +172,25 @@ def teardown(self): """Cleanup after processing all streams.""" - if self.blobdir and os.path.exists(self.blobdir): - self.ui.debug(b"Removing blob dir %s ...\n" % self.blobdir) - shutil.rmtree(self.blobdir) + if self.blobdb: + self.blobdb.commit() + self.blobdb.close() + self.blobdb = None + if self.blobpersist: + self.ui.debug(b"Persisting blob database %s ...\n" + % self.blobpath) + else: + self.ui.debug(b"Removing blob database %s ...\n" + % self.blobpath) + os.remove(self.blobpath) + elif os.path.exists(self.blobpath): + if self.blobpersist: + self.ui.debug(b"Persisting blob directory %s ...\n" + % self.blobpath) + else: + self.ui.debug(b"Removing blob directory %s ...\n" + % self.blobpath) + shutil.rmtree(self.blobpath) def progress_handler(self, cmd): self.ui.write(b"Progress: %s\n" % cmd.message) @@ -163,36 +198,58 @@ def blob_handler(self, cmd): self.writeblob(cmd.id, cmd.data) - def _getblobfilename(self, blobid): - if self.blobdir is None: - raise RuntimeError("no blobs seen, so no blob directory created") - # XXX should escape ":" for windows - return os.path.join(self.blobdir, b"blob-" + blobid) + def _openblobdb(self): + self.blobdb = sqlite3.connect(self.blobpath) + self.blobdb.text_factory = str + self.blobdb.execute("CREATE TABLE IF NOT EXISTS blob(id varchar PRIMARY KEY NOT NULL, content blob NOT NULL)") - def getblob(self, fileid): - (commitid, blobid) = fileid - f = open(self._getblobfilename(blobid), "rb") - try: - return f.read() - finally: - f.close() + def _getblobfilename(self, blobid): + # XXX should escape ":" for windows + return os.path.join(self.blobpath, b"blob-" + blobid) def writeblob(self, blobid, data): - if self.blobdir is None: # no blobs seen yet - self.blobdir = os.path.join(self.repo.root, b".hg", b"blobs") - os.mkdir(self.blobdir) + if self.compressor: + data = self.compressor.compress(data) - fn = self._getblobfilename(blobid) - blobfile = open(fn, "wb") - #self.ui.debug("writing blob %s to %s (%d bytes)\n" - # % (blobid, fn, len(data))) - blobfile.write(data) - blobfile.close() + if have_sqlite: + if self.blobdb is None: + self._openblobdb() + self.blobdb.execute("INSERT OR IGNORE INTO blob(id, content) VALUES(?, ?)", + (blobid, data)) + else: + fn = self._getblobfilename(blobid) + if not os.path.isfile(fn): + if not os.path.exists(self.blobpath): + os.mkdir(self.blobpath) + with open(fn, "wb") as f: + f.write(data) self.numblobs += 1 if self.numblobs % 500 == 0: self.ui.status(b"%d blobs read\n" % self.numblobs) + def getblob(self, fileid): + (commitid, blobid) = fileid + + if have_sqlite: + if self.blobdb is None: + self._openblobdb() + with closing(self.blobdb.cursor()) as c: + c.execute("SELECT content FROM blob WHERE id=?", (blobid,)) + data, = c.fetchone() + else: + fn = self._getblobfilename(blobid) + if os.path.isfile(fn): + with open(fn, "rb") as f: + data = f.read() + + if not data: + raise RuntimeError("missing blob %s for fileid %s" + % (blobid, fileid)) + if self.decompressor: + data = self.decompressor.decompress(data, 10**8) + return data + def getmode(self, name, fileid): (commitid, blobid) = fileid return self.filemodes[commitid][name]
