diff hgext3rd/fastimport/hgimport.py @ 95:3b398a887b95

Use a sqlite3 database to store the blob data if available This is much more performant than using a filesystem when we are dealing with a large number of blobs. If sqlite3 is not available, then fallback to writing to the filesystem. In both cases, the blob data is compressed before writing to save space. A new option has also been added to specify a path for persistent blob data. This is only really important for large continuous interations where the source data has no concept of export marks and thus only gets bigger. What we gain here is a reduction in the write load on the disk.
author Roy Marples <roy@marples.name>
date Thu, 21 Jan 2021 23:59:21 +0000
parents 2ce33511de87
children cde0e1d24e58
line wrap: on
line diff
--- a/hgext3rd/fastimport/hgimport.py	Thu Jan 21 22:52:55 2021 +0000
+++ b/hgext3rd/fastimport/hgimport.py	Thu Jan 21 23:59:21 2021 +0000
@@ -25,20 +25,31 @@
 import stat
 import sys
 
+from contextlib import closing
+
 from hgext.convert import common, hg as converthg
-from mercurial import util
+from mercurial import util, zstd
 from mercurial.i18n import _
 
 from .vendor.python_fastimport import processor, parser
 
+# When dealing with many blobs, a database is more efficient
+# than a filesystem. However, it's not necessary for us to work.
+try:
+    import sqlite3
+    have_sqlite = True
+except (ImportError):
+    have_sqlite = False
+
+
 class fastimport_source(common.converter_source):
     """Interface between the fastimport processor below and Mercurial's
     normal conversion infrastructure.
     """
-    def __init__(self, ui, repotype, repo, sources):
+    def __init__(self, ui, repotype, repo, sources, blobpath):
         self.ui = ui
         self.sources = sources
-        self.processor = HgImportProcessor(ui, repo)
+        self.processor = HgImportProcessor(ui, repo, blobpath)
         self.parsed = False
         self.repotype = repotype
 
@@ -129,7 +140,7 @@
 
     tagprefix = b"refs/tags/"
 
-    def __init__(self, ui, repo):
+    def __init__(self, ui, repo, blobpath):
         super(HgImportProcessor, self).__init__()
         self.ui = ui
         self.repo = repo
@@ -145,7 +156,15 @@
         self.tags = []                  # list of (tag, mark) tuples
 
         self.numblobs = 0               # for progress reporting
-        self.blobdir = None
+        if blobpath:
+            self.blobpath = blobpath
+            self.blobpersist = True
+        else:
+            self.blobpath = os.path.join(self.repo.root, b".hg", b"blobs")
+            self.blobpersist = False
+        self.blobdb = None
+        self.compressor = zstd.ZstdCompressor(level = 6) # compress the blob
+        self.decompressor = zstd.ZstdDecompressor()      # decompress the blob
 
     def setup(self):
         """Setup before processing any streams."""
@@ -153,9 +172,25 @@
 
     def teardown(self):
         """Cleanup after processing all streams."""
-        if self.blobdir and os.path.exists(self.blobdir):
-            self.ui.debug(b"Removing blob dir %s ...\n" % self.blobdir)
-            shutil.rmtree(self.blobdir)
+        if self.blobdb:
+            self.blobdb.commit()
+            self.blobdb.close()
+            self.blobdb = None
+            if self.blobpersist:
+                self.ui.debug(b"Persisting blob database %s ...\n"
+                              % self.blobpath)
+            else:
+                self.ui.debug(b"Removing blob database %s ...\n"
+                              % self.blobpath)
+                os.remove(self.blobpath)
+        elif os.path.exists(self.blobpath):
+            if self.blobpersist:
+                self.ui.debug(b"Persisting blob directory %s ...\n"
+                              % self.blobpath)
+            else:
+                self.ui.debug(b"Removing blob directory %s ...\n"
+                          % self.blobpath)
+                shutil.rmtree(self.blobpath)
 
     def progress_handler(self, cmd):
         self.ui.write(b"Progress: %s\n" % cmd.message)
@@ -163,36 +198,58 @@
     def blob_handler(self, cmd):
         self.writeblob(cmd.id, cmd.data)
 
-    def _getblobfilename(self, blobid):
-        if self.blobdir is None:
-            raise RuntimeError("no blobs seen, so no blob directory created")
-        # XXX should escape ":" for windows
-        return os.path.join(self.blobdir, b"blob-" + blobid)
+    def _openblobdb(self):
+        self.blobdb = sqlite3.connect(self.blobpath)
+        self.blobdb.text_factory = str
+        self.blobdb.execute("CREATE TABLE IF NOT EXISTS blob(id varchar PRIMARY KEY NOT NULL, content blob NOT NULL)")
 
-    def getblob(self, fileid):
-        (commitid, blobid) = fileid
-        f = open(self._getblobfilename(blobid), "rb")
-        try:
-            return f.read()
-        finally:
-            f.close()
+    def _getblobfilename(self, blobid):
+        # XXX should escape ":" for windows
+        return os.path.join(self.blobpath, b"blob-" + blobid)
 
     def writeblob(self, blobid, data):
-        if self.blobdir is None:        # no blobs seen yet
-            self.blobdir = os.path.join(self.repo.root, b".hg", b"blobs")
-            os.mkdir(self.blobdir)
+        if self.compressor:
+            data = self.compressor.compress(data)
 
-        fn = self._getblobfilename(blobid)
-        blobfile = open(fn, "wb")
-        #self.ui.debug("writing blob %s to %s (%d bytes)\n"
-        #              % (blobid, fn, len(data)))
-        blobfile.write(data)
-        blobfile.close()
+        if have_sqlite:
+            if self.blobdb is None:
+                self._openblobdb()
+            self.blobdb.execute("INSERT OR IGNORE INTO blob(id, content) VALUES(?, ?)",
+                                (blobid, data))
+        else:
+            fn = self._getblobfilename(blobid)
+            if not os.path.isfile(fn):
+                if not os.path.exists(self.blobpath):
+                    os.mkdir(self.blobpath)
+                with open(fn, "wb") as f:
+                    f.write(data)
 
         self.numblobs += 1
         if self.numblobs % 500 == 0:
             self.ui.status(b"%d blobs read\n" % self.numblobs)
 
+    def getblob(self, fileid):
+        (commitid, blobid) = fileid
+
+        if have_sqlite:
+            if self.blobdb is None:
+                self._openblobdb()
+            with closing(self.blobdb.cursor()) as c:
+                c.execute("SELECT content FROM blob WHERE id=?", (blobid,))
+                data, = c.fetchone()
+        else:
+            fn = self._getblobfilename(blobid)
+            if os.path.isfile(fn):
+                with open(fn, "rb") as f:
+                    data = f.read()
+
+        if not data:
+            raise RuntimeError("missing blob %s for fileid %s"
+                               % (blobid, fileid))
+        if self.decompressor:
+            data = self.decompressor.decompress(data, 10**8)
+        return data
+
     def getmode(self, name, fileid):
         (commitid, blobid) = fileid
         return self.filemodes[commitid][name]