changeset 95:3b398a887b95

Use a sqlite3 database to store the blob data if available This is much more performant than using a filesystem when we are dealing with a large number of blobs. If sqlite3 is not available, then fallback to writing to the filesystem. In both cases, the blob data is compressed before writing to save space. A new option has also been added to specify a path for persistent blob data. This is only really important for large continuous interations where the source data has no concept of export marks and thus only gets bigger. What we gain here is a reduction in the write load on the disk.
author Roy Marples <roy@marples.name>
date Thu, 21 Jan 2021 23:59:21 +0000
parents b18c5670f6c0
children 7eb15a5c4cad
files hgext3rd/fastimport/__init__.py hgext3rd/fastimport/hgimport.py
diffstat 2 files changed, 91 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/hgext3rd/fastimport/__init__.py	Thu Jan 21 22:52:55 2021 +0000
+++ b/hgext3rd/fastimport/__init__.py	Thu Jan 21 23:59:21 2021 +0000
@@ -32,7 +32,8 @@
 @command(b'fastimport',
          [(b'', b'branchsort', None, _(b'try to sort changesets by branches')),
           (b'', b'datesort', None, _(b'try to sort changesets by date')),
-          (b'', b'sourcesort', None, _(b'preserve source changesets order'))],
+          (b'', b'sourcesort', None, _(b'preserve source changesets order')),
+          (b'', b'blobpath', b'', _(b'path for persistent blob data'))],
          _(b'hg fastimport SOURCE ...'),
           norepo=False)
 
@@ -55,6 +56,8 @@
     if not sources:
         sources = (b'-')
 
+    opts = pycompat.byteskwargs(opts)
+
     # assume fastimport metadata (usernames, commit messages) are
     # encoded UTF-8
     convcmd.orig_encoding = encoding.encoding
@@ -62,9 +65,8 @@
 
     # sink is the current repo, src is the list of fastimport streams
     destc = hg.mercurial_sink(ui, b'hg', repo.root)
-    srcc = fastimport_source(ui, b'fastimport', repo, sources)
+    srcc = fastimport_source(ui, b'fastimport', repo, sources, opts[b'blobpath'])
 
-    opts = pycompat.byteskwargs(opts)
     defaultsort = b'branchsort'          # for efficiency and consistency
     sortmodes = (b'branchsort', b'datesort', b'sourcesort')
     sortmode = [m for m in sortmodes if opts.get(m)]
--- a/hgext3rd/fastimport/hgimport.py	Thu Jan 21 22:52:55 2021 +0000
+++ b/hgext3rd/fastimport/hgimport.py	Thu Jan 21 23:59:21 2021 +0000
@@ -25,20 +25,31 @@
 import stat
 import sys
 
+from contextlib import closing
+
 from hgext.convert import common, hg as converthg
-from mercurial import util
+from mercurial import util, zstd
 from mercurial.i18n import _
 
 from .vendor.python_fastimport import processor, parser
 
+# When dealing with many blobs, a database is more efficient
+# than a filesystem. However, it's not necessary for us to work.
+try:
+    import sqlite3
+    have_sqlite = True
+except (ImportError):
+    have_sqlite = False
+
+
 class fastimport_source(common.converter_source):
     """Interface between the fastimport processor below and Mercurial's
     normal conversion infrastructure.
     """
-    def __init__(self, ui, repotype, repo, sources):
+    def __init__(self, ui, repotype, repo, sources, blobpath):
         self.ui = ui
         self.sources = sources
-        self.processor = HgImportProcessor(ui, repo)
+        self.processor = HgImportProcessor(ui, repo, blobpath)
         self.parsed = False
         self.repotype = repotype
 
@@ -129,7 +140,7 @@
 
     tagprefix = b"refs/tags/"
 
-    def __init__(self, ui, repo):
+    def __init__(self, ui, repo, blobpath):
         super(HgImportProcessor, self).__init__()
         self.ui = ui
         self.repo = repo
@@ -145,7 +156,15 @@
         self.tags = []                  # list of (tag, mark) tuples
 
         self.numblobs = 0               # for progress reporting
-        self.blobdir = None
+        if blobpath:
+            self.blobpath = blobpath
+            self.blobpersist = True
+        else:
+            self.blobpath = os.path.join(self.repo.root, b".hg", b"blobs")
+            self.blobpersist = False
+        self.blobdb = None
+        self.compressor = zstd.ZstdCompressor(level = 6) # compress the blob
+        self.decompressor = zstd.ZstdDecompressor()      # decompress the blob
 
     def setup(self):
         """Setup before processing any streams."""
@@ -153,9 +172,25 @@
 
     def teardown(self):
         """Cleanup after processing all streams."""
-        if self.blobdir and os.path.exists(self.blobdir):
-            self.ui.debug(b"Removing blob dir %s ...\n" % self.blobdir)
-            shutil.rmtree(self.blobdir)
+        if self.blobdb:
+            self.blobdb.commit()
+            self.blobdb.close()
+            self.blobdb = None
+            if self.blobpersist:
+                self.ui.debug(b"Persisting blob database %s ...\n"
+                              % self.blobpath)
+            else:
+                self.ui.debug(b"Removing blob database %s ...\n"
+                              % self.blobpath)
+                os.remove(self.blobpath)
+        elif os.path.exists(self.blobpath):
+            if self.blobpersist:
+                self.ui.debug(b"Persisting blob directory %s ...\n"
+                              % self.blobpath)
+            else:
+                self.ui.debug(b"Removing blob directory %s ...\n"
+                          % self.blobpath)
+                shutil.rmtree(self.blobpath)
 
     def progress_handler(self, cmd):
         self.ui.write(b"Progress: %s\n" % cmd.message)
@@ -163,36 +198,58 @@
     def blob_handler(self, cmd):
         self.writeblob(cmd.id, cmd.data)
 
-    def _getblobfilename(self, blobid):
-        if self.blobdir is None:
-            raise RuntimeError("no blobs seen, so no blob directory created")
-        # XXX should escape ":" for windows
-        return os.path.join(self.blobdir, b"blob-" + blobid)
+    def _openblobdb(self):
+        self.blobdb = sqlite3.connect(self.blobpath)
+        self.blobdb.text_factory = str
+        self.blobdb.execute("CREATE TABLE IF NOT EXISTS blob(id varchar PRIMARY KEY NOT NULL, content blob NOT NULL)")
 
-    def getblob(self, fileid):
-        (commitid, blobid) = fileid
-        f = open(self._getblobfilename(blobid), "rb")
-        try:
-            return f.read()
-        finally:
-            f.close()
+    def _getblobfilename(self, blobid):
+        # XXX should escape ":" for windows
+        return os.path.join(self.blobpath, b"blob-" + blobid)
 
     def writeblob(self, blobid, data):
-        if self.blobdir is None:        # no blobs seen yet
-            self.blobdir = os.path.join(self.repo.root, b".hg", b"blobs")
-            os.mkdir(self.blobdir)
+        if self.compressor:
+            data = self.compressor.compress(data)
 
-        fn = self._getblobfilename(blobid)
-        blobfile = open(fn, "wb")
-        #self.ui.debug("writing blob %s to %s (%d bytes)\n"
-        #              % (blobid, fn, len(data)))
-        blobfile.write(data)
-        blobfile.close()
+        if have_sqlite:
+            if self.blobdb is None:
+                self._openblobdb()
+            self.blobdb.execute("INSERT OR IGNORE INTO blob(id, content) VALUES(?, ?)",
+                                (blobid, data))
+        else:
+            fn = self._getblobfilename(blobid)
+            if not os.path.isfile(fn):
+                if not os.path.exists(self.blobpath):
+                    os.mkdir(self.blobpath)
+                with open(fn, "wb") as f:
+                    f.write(data)
 
         self.numblobs += 1
         if self.numblobs % 500 == 0:
             self.ui.status(b"%d blobs read\n" % self.numblobs)
 
+    def getblob(self, fileid):
+        (commitid, blobid) = fileid
+
+        if have_sqlite:
+            if self.blobdb is None:
+                self._openblobdb()
+            with closing(self.blobdb.cursor()) as c:
+                c.execute("SELECT content FROM blob WHERE id=?", (blobid,))
+                data, = c.fetchone()
+        else:
+            fn = self._getblobfilename(blobid)
+            if os.path.isfile(fn):
+                with open(fn, "rb") as f:
+                    data = f.read()
+
+        if not data:
+            raise RuntimeError("missing blob %s for fileid %s"
+                               % (blobid, fileid))
+        if self.decompressor:
+            data = self.decompressor.decompress(data, 10**8)
+        return data
+
     def getmode(self, name, fileid):
         (commitid, blobid) = fileid
         return self.filemodes[commitid][name]