Mercurial > hg > hg-fastimport
comparison hgext3rd/fastimport/hgimport.py @ 95:3b398a887b95
Use a sqlite3 database to store the blob data if available
This is much more performant than using a filesystem when we are
dealing with a large number of blobs.
If sqlite3 is not available, then fallback to writing to the filesystem.
In both cases, the blob data is compressed before writing to save space.
A new option has also been added to specify a path for persistent blob data.
This is only really important for large continuous interations where the
source data has no concept of export marks and thus only gets bigger.
What we gain here is a reduction in the write load on the disk.
| author | Roy Marples <roy@marples.name> |
|---|---|
| date | Thu, 21 Jan 2021 23:59:21 +0000 |
| parents | 2ce33511de87 |
| children | cde0e1d24e58 |
comparison
equal
deleted
inserted
replaced
| 94:b18c5670f6c0 | 95:3b398a887b95 |
|---|---|
| 23 import os | 23 import os |
| 24 import shutil | 24 import shutil |
| 25 import stat | 25 import stat |
| 26 import sys | 26 import sys |
| 27 | 27 |
| 28 from contextlib import closing | |
| 29 | |
| 28 from hgext.convert import common, hg as converthg | 30 from hgext.convert import common, hg as converthg |
| 29 from mercurial import util | 31 from mercurial import util, zstd |
| 30 from mercurial.i18n import _ | 32 from mercurial.i18n import _ |
| 31 | 33 |
| 32 from .vendor.python_fastimport import processor, parser | 34 from .vendor.python_fastimport import processor, parser |
| 35 | |
| 36 # When dealing with many blobs, a database is more efficient | |
| 37 # than a filesystem. However, it's not necessary for us to work. | |
| 38 try: | |
| 39 import sqlite3 | |
| 40 have_sqlite = True | |
| 41 except (ImportError): | |
| 42 have_sqlite = False | |
| 43 | |
| 33 | 44 |
| 34 class fastimport_source(common.converter_source): | 45 class fastimport_source(common.converter_source): |
| 35 """Interface between the fastimport processor below and Mercurial's | 46 """Interface between the fastimport processor below and Mercurial's |
| 36 normal conversion infrastructure. | 47 normal conversion infrastructure. |
| 37 """ | 48 """ |
| 38 def __init__(self, ui, repotype, repo, sources): | 49 def __init__(self, ui, repotype, repo, sources, blobpath): |
| 39 self.ui = ui | 50 self.ui = ui |
| 40 self.sources = sources | 51 self.sources = sources |
| 41 self.processor = HgImportProcessor(ui, repo) | 52 self.processor = HgImportProcessor(ui, repo, blobpath) |
| 42 self.parsed = False | 53 self.parsed = False |
| 43 self.repotype = repotype | 54 self.repotype = repotype |
| 44 | 55 |
| 45 # converter_source methods | 56 # converter_source methods |
| 46 | 57 |
| 127 | 138 |
| 128 class HgImportProcessor(processor.ImportProcessor): | 139 class HgImportProcessor(processor.ImportProcessor): |
| 129 | 140 |
| 130 tagprefix = b"refs/tags/" | 141 tagprefix = b"refs/tags/" |
| 131 | 142 |
| 132 def __init__(self, ui, repo): | 143 def __init__(self, ui, repo, blobpath): |
| 133 super(HgImportProcessor, self).__init__() | 144 super(HgImportProcessor, self).__init__() |
| 134 self.ui = ui | 145 self.ui = ui |
| 135 self.repo = repo | 146 self.repo = repo |
| 136 | 147 |
| 137 self.commitmap = {} # map commit ID (":1") to commit object | 148 self.commitmap = {} # map commit ID (":1") to commit object |
| 143 self.copies = {} # map commit id to dict of file copies | 154 self.copies = {} # map commit id to dict of file copies |
| 144 | 155 |
| 145 self.tags = [] # list of (tag, mark) tuples | 156 self.tags = [] # list of (tag, mark) tuples |
| 146 | 157 |
| 147 self.numblobs = 0 # for progress reporting | 158 self.numblobs = 0 # for progress reporting |
| 148 self.blobdir = None | 159 if blobpath: |
| 160 self.blobpath = blobpath | |
| 161 self.blobpersist = True | |
| 162 else: | |
| 163 self.blobpath = os.path.join(self.repo.root, b".hg", b"blobs") | |
| 164 self.blobpersist = False | |
| 165 self.blobdb = None | |
| 166 self.compressor = zstd.ZstdCompressor(level = 6) # compress the blob | |
| 167 self.decompressor = zstd.ZstdDecompressor() # decompress the blob | |
| 149 | 168 |
| 150 def setup(self): | 169 def setup(self): |
| 151 """Setup before processing any streams.""" | 170 """Setup before processing any streams.""" |
| 152 pass | 171 pass |
| 153 | 172 |
| 154 def teardown(self): | 173 def teardown(self): |
| 155 """Cleanup after processing all streams.""" | 174 """Cleanup after processing all streams.""" |
| 156 if self.blobdir and os.path.exists(self.blobdir): | 175 if self.blobdb: |
| 157 self.ui.debug(b"Removing blob dir %s ...\n" % self.blobdir) | 176 self.blobdb.commit() |
| 158 shutil.rmtree(self.blobdir) | 177 self.blobdb.close() |
| 178 self.blobdb = None | |
| 179 if self.blobpersist: | |
| 180 self.ui.debug(b"Persisting blob database %s ...\n" | |
| 181 % self.blobpath) | |
| 182 else: | |
| 183 self.ui.debug(b"Removing blob database %s ...\n" | |
| 184 % self.blobpath) | |
| 185 os.remove(self.blobpath) | |
| 186 elif os.path.exists(self.blobpath): | |
| 187 if self.blobpersist: | |
| 188 self.ui.debug(b"Persisting blob directory %s ...\n" | |
| 189 % self.blobpath) | |
| 190 else: | |
| 191 self.ui.debug(b"Removing blob directory %s ...\n" | |
| 192 % self.blobpath) | |
| 193 shutil.rmtree(self.blobpath) | |
| 159 | 194 |
| 160 def progress_handler(self, cmd): | 195 def progress_handler(self, cmd): |
| 161 self.ui.write(b"Progress: %s\n" % cmd.message) | 196 self.ui.write(b"Progress: %s\n" % cmd.message) |
| 162 | 197 |
| 163 def blob_handler(self, cmd): | 198 def blob_handler(self, cmd): |
| 164 self.writeblob(cmd.id, cmd.data) | 199 self.writeblob(cmd.id, cmd.data) |
| 165 | 200 |
| 201 def _openblobdb(self): | |
| 202 self.blobdb = sqlite3.connect(self.blobpath) | |
| 203 self.blobdb.text_factory = str | |
| 204 self.blobdb.execute("CREATE TABLE IF NOT EXISTS blob(id varchar PRIMARY KEY NOT NULL, content blob NOT NULL)") | |
| 205 | |
| 166 def _getblobfilename(self, blobid): | 206 def _getblobfilename(self, blobid): |
| 167 if self.blobdir is None: | |
| 168 raise RuntimeError("no blobs seen, so no blob directory created") | |
| 169 # XXX should escape ":" for windows | 207 # XXX should escape ":" for windows |
| 170 return os.path.join(self.blobdir, b"blob-" + blobid) | 208 return os.path.join(self.blobpath, b"blob-" + blobid) |
| 171 | |
| 172 def getblob(self, fileid): | |
| 173 (commitid, blobid) = fileid | |
| 174 f = open(self._getblobfilename(blobid), "rb") | |
| 175 try: | |
| 176 return f.read() | |
| 177 finally: | |
| 178 f.close() | |
| 179 | 209 |
| 180 def writeblob(self, blobid, data): | 210 def writeblob(self, blobid, data): |
| 181 if self.blobdir is None: # no blobs seen yet | 211 if self.compressor: |
| 182 self.blobdir = os.path.join(self.repo.root, b".hg", b"blobs") | 212 data = self.compressor.compress(data) |
| 183 os.mkdir(self.blobdir) | 213 |
| 184 | 214 if have_sqlite: |
| 185 fn = self._getblobfilename(blobid) | 215 if self.blobdb is None: |
| 186 blobfile = open(fn, "wb") | 216 self._openblobdb() |
| 187 #self.ui.debug("writing blob %s to %s (%d bytes)\n" | 217 self.blobdb.execute("INSERT OR IGNORE INTO blob(id, content) VALUES(?, ?)", |
| 188 # % (blobid, fn, len(data))) | 218 (blobid, data)) |
| 189 blobfile.write(data) | 219 else: |
| 190 blobfile.close() | 220 fn = self._getblobfilename(blobid) |
| 221 if not os.path.isfile(fn): | |
| 222 if not os.path.exists(self.blobpath): | |
| 223 os.mkdir(self.blobpath) | |
| 224 with open(fn, "wb") as f: | |
| 225 f.write(data) | |
| 191 | 226 |
| 192 self.numblobs += 1 | 227 self.numblobs += 1 |
| 193 if self.numblobs % 500 == 0: | 228 if self.numblobs % 500 == 0: |
| 194 self.ui.status(b"%d blobs read\n" % self.numblobs) | 229 self.ui.status(b"%d blobs read\n" % self.numblobs) |
| 230 | |
| 231 def getblob(self, fileid): | |
| 232 (commitid, blobid) = fileid | |
| 233 | |
| 234 if have_sqlite: | |
| 235 if self.blobdb is None: | |
| 236 self._openblobdb() | |
| 237 with closing(self.blobdb.cursor()) as c: | |
| 238 c.execute("SELECT content FROM blob WHERE id=?", (blobid,)) | |
| 239 data, = c.fetchone() | |
| 240 else: | |
| 241 fn = self._getblobfilename(blobid) | |
| 242 if os.path.isfile(fn): | |
| 243 with open(fn, "rb") as f: | |
| 244 data = f.read() | |
| 245 | |
| 246 if not data: | |
| 247 raise RuntimeError("missing blob %s for fileid %s" | |
| 248 % (blobid, fileid)) | |
| 249 if self.decompressor: | |
| 250 data = self.decompressor.decompress(data, 10**8) | |
| 251 return data | |
| 195 | 252 |
| 196 def getmode(self, name, fileid): | 253 def getmode(self, name, fileid): |
| 197 (commitid, blobid) = fileid | 254 (commitid, blobid) = fileid |
| 198 return self.filemodes[commitid][name] | 255 return self.filemodes[commitid][name] |
| 199 | 256 |
