comparison hgext3rd/fastimport/hgimport.py @ 95:3b398a887b95

Use a sqlite3 database to store the blob data if available This is much more performant than using a filesystem when we are dealing with a large number of blobs. If sqlite3 is not available, then fallback to writing to the filesystem. In both cases, the blob data is compressed before writing to save space. A new option has also been added to specify a path for persistent blob data. This is only really important for large continuous interations where the source data has no concept of export marks and thus only gets bigger. What we gain here is a reduction in the write load on the disk.
author Roy Marples <roy@marples.name>
date Thu, 21 Jan 2021 23:59:21 +0000
parents 2ce33511de87
children cde0e1d24e58
comparison
equal deleted inserted replaced
94:b18c5670f6c0 95:3b398a887b95
23 import os 23 import os
24 import shutil 24 import shutil
25 import stat 25 import stat
26 import sys 26 import sys
27 27
28 from contextlib import closing
29
28 from hgext.convert import common, hg as converthg 30 from hgext.convert import common, hg as converthg
29 from mercurial import util 31 from mercurial import util, zstd
30 from mercurial.i18n import _ 32 from mercurial.i18n import _
31 33
32 from .vendor.python_fastimport import processor, parser 34 from .vendor.python_fastimport import processor, parser
35
36 # When dealing with many blobs, a database is more efficient
37 # than a filesystem. However, it's not necessary for us to work.
38 try:
39 import sqlite3
40 have_sqlite = True
41 except (ImportError):
42 have_sqlite = False
43
33 44
34 class fastimport_source(common.converter_source): 45 class fastimport_source(common.converter_source):
35 """Interface between the fastimport processor below and Mercurial's 46 """Interface between the fastimport processor below and Mercurial's
36 normal conversion infrastructure. 47 normal conversion infrastructure.
37 """ 48 """
38 def __init__(self, ui, repotype, repo, sources): 49 def __init__(self, ui, repotype, repo, sources, blobpath):
39 self.ui = ui 50 self.ui = ui
40 self.sources = sources 51 self.sources = sources
41 self.processor = HgImportProcessor(ui, repo) 52 self.processor = HgImportProcessor(ui, repo, blobpath)
42 self.parsed = False 53 self.parsed = False
43 self.repotype = repotype 54 self.repotype = repotype
44 55
45 # converter_source methods 56 # converter_source methods
46 57
127 138
128 class HgImportProcessor(processor.ImportProcessor): 139 class HgImportProcessor(processor.ImportProcessor):
129 140
130 tagprefix = b"refs/tags/" 141 tagprefix = b"refs/tags/"
131 142
132 def __init__(self, ui, repo): 143 def __init__(self, ui, repo, blobpath):
133 super(HgImportProcessor, self).__init__() 144 super(HgImportProcessor, self).__init__()
134 self.ui = ui 145 self.ui = ui
135 self.repo = repo 146 self.repo = repo
136 147
137 self.commitmap = {} # map commit ID (":1") to commit object 148 self.commitmap = {} # map commit ID (":1") to commit object
143 self.copies = {} # map commit id to dict of file copies 154 self.copies = {} # map commit id to dict of file copies
144 155
145 self.tags = [] # list of (tag, mark) tuples 156 self.tags = [] # list of (tag, mark) tuples
146 157
147 self.numblobs = 0 # for progress reporting 158 self.numblobs = 0 # for progress reporting
148 self.blobdir = None 159 if blobpath:
160 self.blobpath = blobpath
161 self.blobpersist = True
162 else:
163 self.blobpath = os.path.join(self.repo.root, b".hg", b"blobs")
164 self.blobpersist = False
165 self.blobdb = None
166 self.compressor = zstd.ZstdCompressor(level = 6) # compress the blob
167 self.decompressor = zstd.ZstdDecompressor() # decompress the blob
149 168
150 def setup(self): 169 def setup(self):
151 """Setup before processing any streams.""" 170 """Setup before processing any streams."""
152 pass 171 pass
153 172
154 def teardown(self): 173 def teardown(self):
155 """Cleanup after processing all streams.""" 174 """Cleanup after processing all streams."""
156 if self.blobdir and os.path.exists(self.blobdir): 175 if self.blobdb:
157 self.ui.debug(b"Removing blob dir %s ...\n" % self.blobdir) 176 self.blobdb.commit()
158 shutil.rmtree(self.blobdir) 177 self.blobdb.close()
178 self.blobdb = None
179 if self.blobpersist:
180 self.ui.debug(b"Persisting blob database %s ...\n"
181 % self.blobpath)
182 else:
183 self.ui.debug(b"Removing blob database %s ...\n"
184 % self.blobpath)
185 os.remove(self.blobpath)
186 elif os.path.exists(self.blobpath):
187 if self.blobpersist:
188 self.ui.debug(b"Persisting blob directory %s ...\n"
189 % self.blobpath)
190 else:
191 self.ui.debug(b"Removing blob directory %s ...\n"
192 % self.blobpath)
193 shutil.rmtree(self.blobpath)
159 194
160 def progress_handler(self, cmd): 195 def progress_handler(self, cmd):
161 self.ui.write(b"Progress: %s\n" % cmd.message) 196 self.ui.write(b"Progress: %s\n" % cmd.message)
162 197
163 def blob_handler(self, cmd): 198 def blob_handler(self, cmd):
164 self.writeblob(cmd.id, cmd.data) 199 self.writeblob(cmd.id, cmd.data)
165 200
201 def _openblobdb(self):
202 self.blobdb = sqlite3.connect(self.blobpath)
203 self.blobdb.text_factory = str
204 self.blobdb.execute("CREATE TABLE IF NOT EXISTS blob(id varchar PRIMARY KEY NOT NULL, content blob NOT NULL)")
205
166 def _getblobfilename(self, blobid): 206 def _getblobfilename(self, blobid):
167 if self.blobdir is None:
168 raise RuntimeError("no blobs seen, so no blob directory created")
169 # XXX should escape ":" for windows 207 # XXX should escape ":" for windows
170 return os.path.join(self.blobdir, b"blob-" + blobid) 208 return os.path.join(self.blobpath, b"blob-" + blobid)
171
172 def getblob(self, fileid):
173 (commitid, blobid) = fileid
174 f = open(self._getblobfilename(blobid), "rb")
175 try:
176 return f.read()
177 finally:
178 f.close()
179 209
180 def writeblob(self, blobid, data): 210 def writeblob(self, blobid, data):
181 if self.blobdir is None: # no blobs seen yet 211 if self.compressor:
182 self.blobdir = os.path.join(self.repo.root, b".hg", b"blobs") 212 data = self.compressor.compress(data)
183 os.mkdir(self.blobdir) 213
184 214 if have_sqlite:
185 fn = self._getblobfilename(blobid) 215 if self.blobdb is None:
186 blobfile = open(fn, "wb") 216 self._openblobdb()
187 #self.ui.debug("writing blob %s to %s (%d bytes)\n" 217 self.blobdb.execute("INSERT OR IGNORE INTO blob(id, content) VALUES(?, ?)",
188 # % (blobid, fn, len(data))) 218 (blobid, data))
189 blobfile.write(data) 219 else:
190 blobfile.close() 220 fn = self._getblobfilename(blobid)
221 if not os.path.isfile(fn):
222 if not os.path.exists(self.blobpath):
223 os.mkdir(self.blobpath)
224 with open(fn, "wb") as f:
225 f.write(data)
191 226
192 self.numblobs += 1 227 self.numblobs += 1
193 if self.numblobs % 500 == 0: 228 if self.numblobs % 500 == 0:
194 self.ui.status(b"%d blobs read\n" % self.numblobs) 229 self.ui.status(b"%d blobs read\n" % self.numblobs)
230
231 def getblob(self, fileid):
232 (commitid, blobid) = fileid
233
234 if have_sqlite:
235 if self.blobdb is None:
236 self._openblobdb()
237 with closing(self.blobdb.cursor()) as c:
238 c.execute("SELECT content FROM blob WHERE id=?", (blobid,))
239 data, = c.fetchone()
240 else:
241 fn = self._getblobfilename(blobid)
242 if os.path.isfile(fn):
243 with open(fn, "rb") as f:
244 data = f.read()
245
246 if not data:
247 raise RuntimeError("missing blob %s for fileid %s"
248 % (blobid, fileid))
249 if self.decompressor:
250 data = self.decompressor.decompress(data, 10**8)
251 return data
195 252
196 def getmode(self, name, fileid): 253 def getmode(self, name, fileid):
197 (commitid, blobid) = fileid 254 (commitid, blobid) = fileid
198 return self.filemodes[commitid][name] 255 return self.filemodes[commitid][name]
199 256