diff hgext3rd/fastimport/vendor/python_fastimport/commands.py @ 86:28704a2a7461 vendor/python-fastimport

Import python-fastimport-0.9.8
author Roy Marples <roy@marples.name>
date Tue, 19 Jan 2021 22:56:34 +0000
parents
children 2fc99e3479d9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext3rd/fastimport/vendor/python_fastimport/commands.py	Tue Jan 19 22:56:34 2021 +0000
@@ -0,0 +1,530 @@
+# Copyright (C) 2008 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""fast-import command classes.
+
+These objects are used by the parser to represent the content of
+a fast-import stream.
+"""
+from __future__ import division
+
+import re
+import stat
+import sys
+
+from fastimport.helpers import (
+    newobject as object,
+    utf8_bytes_string,
+    repr_bytes,
+    )
+
+
+# There is a bug in git 1.5.4.3 and older by which unquoting a string consumes
+# one extra character. Set this variable to True to work-around it. It only
+# happens when renaming a file whose name contains spaces and/or quotes, and
+# the symptom is:
+#   % git-fast-import
+#   fatal: Missing space after source: R "file 1.txt" file 2.txt
+# http://git.kernel.org/?p=git/git.git;a=commit;h=c8744d6a8b27115503565041566d97c21e722584
+GIT_FAST_IMPORT_NEEDS_EXTRA_SPACE_AFTER_QUOTE = False
+
+
+# Lists of command names
+COMMAND_NAMES = [b'blob', b'checkpoint', b'commit', b'feature', b'progress',
+    b'reset', b'tag']
+FILE_COMMAND_NAMES = [b'filemodify', b'filedelete', b'filecopy', b'filerename',
+    b'filedeleteall']
+
+# Feature names
+MULTIPLE_AUTHORS_FEATURE = b'multiple-authors'
+COMMIT_PROPERTIES_FEATURE = b'commit-properties'
+EMPTY_DIRS_FEATURE = b'empty-directories'
+FEATURE_NAMES = [
+    MULTIPLE_AUTHORS_FEATURE,
+    COMMIT_PROPERTIES_FEATURE,
+    EMPTY_DIRS_FEATURE,
+    ]
+
+
+class ImportCommand(object):
+    """Base class for import commands."""
+
+    def __init__(self, name):
+        self.name = name
+        # List of field names not to display
+        self._binary = []
+
+    def __str__(self):
+        return repr(self)
+
+    def __repr__(self):
+        if sys.version_info[0] == 2:
+            return self.__bytes__()
+        else:
+            return bytes(self).decode('utf8')
+
+    def __bytes__(self):
+        raise NotImplementedError(
+            'An implementation of __bytes__ is required'
+        )
+
+    def dump_str(self, names=None, child_lists=None, verbose=False):
+        """Dump fields as a string.
+
+        For debugging.
+
+        :param names: the list of fields to include or
+            None for all public fields
+        :param child_lists: dictionary of child command names to
+            fields for that child command to include
+        :param verbose: if True, prefix each line with the command class and
+            display fields as a dictionary; if False, dump just the field
+            values with tabs between them
+        """
+        interesting = {}
+        if names is None:
+            fields = [
+                k for k in list(self.__dict__.keys())
+                if not k.startswith(b'_')
+            ]
+        else:
+            fields = names
+        for field in fields:
+            value = self.__dict__.get(field)
+            if field in self._binary and value is not None:
+                value = b'(...)'
+            interesting[field] = value
+        if verbose:
+            return "%s: %s" % (self.__class__.__name__, interesting)
+        else:
+            return "\t".join([repr(interesting[k]) for k in fields])
+
+
+class BlobCommand(ImportCommand):
+
+    def __init__(self, mark, data, lineno=0):
+        ImportCommand.__init__(self, b'blob')
+        self.mark = mark
+        self.data = data
+        self.lineno = lineno
+        # Provide a unique id in case the mark is missing
+        if mark is None:
+            self.id = b'@' + ("%d" % lineno).encode('utf-8')
+        else:
+            self.id = b':' + mark
+        self._binary = [b'data']
+
+    def __bytes__(self):
+        if self.mark is None:
+            mark_line = b''
+        else:
+            mark_line = b"\nmark :" + self.mark
+        return (b'blob' + mark_line + b'\n' +
+                ('data %d\n' % len(self.data)).encode('utf-8') + self.data)
+
+
+class CheckpointCommand(ImportCommand):
+
+    def __init__(self):
+        ImportCommand.__init__(self, b'checkpoint')
+
+    def __bytes__(self):
+        return b'checkpoint'
+
+
+class CommitCommand(ImportCommand):
+
+    def __init__(self, ref, mark, author, committer, message, from_,
+        merges, file_iter, lineno=0, more_authors=None, properties=None):
+        ImportCommand.__init__(self, b'commit')
+        self.ref = ref
+        self.mark = mark
+        self.author = author
+        self.committer = committer
+        self.message = message
+        self.from_ = from_
+        self.merges = merges
+        self.file_iter = file_iter
+        self.more_authors = more_authors
+        self.properties = properties
+        self.lineno = lineno
+        self._binary = [b'file_iter']
+        # Provide a unique id in case the mark is missing
+        if self.mark is None:
+            self.id = b'@' + ('%d' % lineno).encode('utf-8')
+        else:
+            if isinstance(self.mark, (int)):
+                self.id = b':' + str(self.mark).encode('utf-8')
+            else:
+                self.id = b':' + self.mark
+
+    def copy(self, **kwargs):
+        if not isinstance(self.file_iter, list):
+            self.file_iter = list(self.file_iter)
+
+        fields = dict(
+            (key, value)
+            for key, value in self.__dict__.items()
+            if key not in ('id', 'name')
+            if not key.startswith('_')
+        )
+
+        fields.update(kwargs)
+
+        return CommitCommand(**fields)
+
+    def __bytes__(self):
+        return self.to_string(include_file_contents=True)
+
+
+    def to_string(self, use_features=True, include_file_contents=False):
+        """
+            @todo the name to_string is ambiguous since the method actually
+                returns bytes.
+        """
+        if self.mark is None:
+            mark_line = b''
+        else:
+            if isinstance(self.mark, (int)):
+                mark_line = b'\nmark :' + str(self.mark).encode('utf-8')
+            else:
+                mark_line = b'\nmark :' + self.mark
+
+        if self.author is None:
+            author_section = b''
+        else:
+            author_section = b'\nauthor ' + format_who_when(self.author)
+            if use_features and self.more_authors:
+                for author in self.more_authors:
+                    author_section += b'\nauthor ' + format_who_when(author)
+
+        committer = b'committer ' + format_who_when(self.committer)
+
+        if self.message is None:
+            msg_section = b''
+        else:
+            msg = self.message
+            msg_section = ('\ndata %d\n' % len(msg)).encode('ascii') + msg
+        if self.from_ is None:
+            from_line = b''
+        else:
+            from_line = b'\nfrom ' + self.from_
+        if self.merges is None:
+            merge_lines = b''
+        else:
+            merge_lines = b''.join([b'\nmerge ' + m
+                for m in self.merges])
+        if use_features and self.properties:
+            property_lines = []
+            for name in sorted(self.properties):
+                value = self.properties[name]
+                property_lines.append(b'\n' + format_property(name, value))
+            properties_section = b''.join(property_lines)
+        else:
+            properties_section = b''
+        if self.file_iter is None:
+            filecommands = b''
+        else:
+            if include_file_contents:
+                filecommands = b''.join([b'\n' + repr_bytes(c)
+                    for c in self.iter_files()])
+            else:
+                filecommands = b''.join([b'\n' + str(c)
+                    for c in self.iter_files()])
+        return b''.join([
+            b'commit ',
+            self.ref,
+            mark_line,
+            author_section + b'\n',
+            committer,
+            msg_section,
+            from_line,
+            merge_lines,
+            properties_section,
+            filecommands])
+
+    def dump_str(self, names=None, child_lists=None, verbose=False):
+        result = [ImportCommand.dump_str(self, names, verbose=verbose)]
+        for f in self.iter_files():
+            if child_lists is None:
+                continue
+            try:
+                child_names = child_lists[f.name]
+            except KeyError:
+                continue
+            result.append('\t%s' % f.dump_str(child_names, verbose=verbose))
+        return '\n'.join(result)
+
+    def iter_files(self):
+        """Iterate over files."""
+        # file_iter may be a callable or an iterator
+        if callable(self.file_iter):
+            return self.file_iter()
+        return iter(self.file_iter)
+
+
+class FeatureCommand(ImportCommand):
+
+    def __init__(self, feature_name, value=None, lineno=0):
+        ImportCommand.__init__(self, b'feature')
+        self.feature_name = feature_name
+        self.value = value
+        self.lineno = lineno
+
+    def __bytes__(self):
+        if self.value is None:
+            value_text = b''
+        else:
+            value_text = b'=' + self.value
+        return b'feature ' + self.feature_name + value_text
+
+
+class ProgressCommand(ImportCommand):
+
+    def __init__(self, message):
+        ImportCommand.__init__(self, b'progress')
+        self.message = message
+
+    def __bytes__(self):
+        return b'progress ' + self.message
+
+
+class ResetCommand(ImportCommand):
+
+    def __init__(self, ref, from_):
+        ImportCommand.__init__(self, b'reset')
+        self.ref = ref
+        self.from_ = from_
+
+    def __bytes__(self):
+        if self.from_ is None:
+            from_line = b''
+        else:
+            # According to git-fast-import(1), the extra LF is optional here;
+            # however, versions of git up to 1.5.4.3 had a bug by which the LF
+            # was needed. Always emit it, since it doesn't hurt and maintains
+            # compatibility with older versions.
+            # http://git.kernel.org/?p=git/git.git;a=commit;h=655e8515f279c01f525745d443f509f97cd805ab
+            from_line = b'\nfrom ' + self.from_ + b'\n'
+        return b'reset ' + self.ref + from_line
+
+
+class TagCommand(ImportCommand):
+
+    def __init__(self, id, from_, tagger, message):
+        ImportCommand.__init__(self, b'tag')
+        self.id = id
+        self.from_ = from_
+        self.tagger = tagger
+        self.message = message
+
+    def __bytes__(self):
+        if self.from_ is None:
+            from_line = b''
+        else:
+            from_line = b'\nfrom ' + self.from_
+        if self.tagger is None:
+            tagger_line = b''
+        else:
+            tagger_line = b'\ntagger ' + format_who_when(self.tagger)
+        if self.message is None:
+            msg_section = b''
+        else:
+            msg = self.message
+            msg_section = ('\ndata %d\n' % len(msg)).encode('ascii') + msg
+        return b'tag ' + self.id + from_line + tagger_line + msg_section
+
+
+class FileCommand(ImportCommand):
+    """Base class for file commands."""
+    pass
+
+
+class FileModifyCommand(FileCommand):
+
+    def __init__(self, path, mode, dataref, data):
+        # Either dataref or data should be null
+        FileCommand.__init__(self, b'filemodify')
+        self.path = check_path(path)
+        self.mode = mode
+        self.dataref = dataref
+        self.data = data
+        self._binary = [b'data']
+
+    def __bytes__(self):
+        return self.to_string(include_file_contents=True)
+
+    def __str__(self):
+        return self.to_string(include_file_contents=False)
+
+    def _format_mode(self, mode):
+        if mode in (0o755, 0o100755):
+            return b'755'
+        elif mode in (0o644, 0o100644):
+            return b'644'
+        elif mode == 0o40000:
+            return b'040000'
+        elif mode == 0o120000:
+            return b'120000'
+        elif mode == 0o160000:
+            return b'160000'
+        else:
+            raise AssertionError('Unknown mode %o' % mode)
+
+    def to_string(self, include_file_contents=False):
+        datastr = b''
+        if stat.S_ISDIR(self.mode):
+            dataref = b'-'
+        elif self.dataref is None:
+            dataref = b'inline'
+            if include_file_contents:
+                datastr = ('\ndata %d\n' % len(self.data)).encode('ascii') + self.data
+        else:
+            dataref = self.dataref
+        path = format_path(self.path)
+
+        return b' '.join(
+            [b'M', self._format_mode(self.mode), dataref, path + datastr])
+
+
+class FileDeleteCommand(FileCommand):
+
+    def __init__(self, path):
+        FileCommand.__init__(self, b'filedelete')
+        self.path = check_path(path)
+
+    def __bytes__(self):
+        return b' '.join([b'D', format_path(self.path)])
+
+
+class FileCopyCommand(FileCommand):
+
+    def __init__(self, src_path, dest_path):
+        FileCommand.__init__(self, b'filecopy')
+        self.src_path = check_path(src_path)
+        self.dest_path = check_path(dest_path)
+
+    def __bytes__(self):
+        return b' '.join([b'C',
+            format_path(self.src_path, quote_spaces=True),
+            format_path(self.dest_path)])
+
+
+class FileRenameCommand(FileCommand):
+
+    def __init__(self, old_path, new_path):
+        FileCommand.__init__(self, b'filerename')
+        self.old_path = check_path(old_path)
+        self.new_path = check_path(new_path)
+
+    def __bytes__(self):
+        return b' '.join([
+            b'R',
+            format_path(self.old_path, quote_spaces=True),
+            format_path(self.new_path)]
+        )
+
+
+class FileDeleteAllCommand(FileCommand):
+
+    def __init__(self):
+        FileCommand.__init__(self, b'filedeleteall')
+
+    def __bytes__(self):
+        return b'deleteall'
+
+
+class NoteModifyCommand(FileCommand):
+
+    def __init__(self, from_, data):
+        super(NoteModifyCommand, self).__init__(b'notemodify')
+        self.from_ = from_
+        self.data = data
+        self._binary = ['data']
+
+    def __bytes__(self):
+        return (b'N inline :' + self.from_ +
+                ('\ndata %d\n'% len(self.data)).encode('ascii') + self.data)
+
+
+def check_path(path):
+    """Check that a path is legal.
+
+    :return: the path if all is OK
+    :raise ValueError: if the path is illegal
+    """
+    if path is None or path == b'' or path.startswith(b'/'):
+        raise ValueError("illegal path '%s'" % path)
+
+    if (
+        (sys.version_info[0] >= 3 and not isinstance(path, bytes)) and
+        (sys.version_info[0] == 2 and not isinstance(path, str))
+    ):
+        raise TypeError("illegale type for path '%r'" % path)
+
+    return path
+
+
+def format_path(p, quote_spaces=False):
+    """Format a path in utf8, quoting it if necessary."""
+    if b'\n' in p:
+        p = re.sub(b'\n', b'\\n', p)
+        quote = True
+    else:
+        quote = p[0] == b'"' or (quote_spaces and b' ' in p)
+    if quote:
+        extra = GIT_FAST_IMPORT_NEEDS_EXTRA_SPACE_AFTER_QUOTE and b' ' or b''
+        p = b'"' + p + b'"' + extra
+    return p
+
+
+def format_who_when(fields):
+    """Format a tuple of name,email,secs-since-epoch,utc-offset-secs as a string."""
+    offset = fields[3]
+    if offset < 0:
+        offset_sign = b'-'
+        offset = abs(offset)
+    else:
+        offset_sign = b'+'
+    offset_hours = offset // 3600
+    offset_minutes = offset // 60 - offset_hours * 60
+    offset_str = offset_sign + ('%02d%02d' % (offset_hours, offset_minutes)).encode('ascii')
+    name = fields[0]
+
+    if name == b'':
+        sep = b''
+    else:
+        sep = b' '
+
+    name = utf8_bytes_string(name)
+
+    email = fields[1]
+
+    email = utf8_bytes_string(email)
+
+    return b''.join((name, sep, b'<', email, b'> ', ("%d" % fields[2]).encode('ascii'), b' ', offset_str))
+
+
+def format_property(name, value):
+    """Format the name and value (both unicode) of a property as a string."""
+    result = b''
+    utf8_name = utf8_bytes_string(name)
+
+    result = b'property ' + utf8_name
+    if value is not None:
+        utf8_value = utf8_bytes_string(value)
+        result += b' ' + ('%d' % len(utf8_value)).encode('ascii') + b' ' + utf8_value
+
+    return result