Skip to content

git

Utilities to get data out of git repositories.

This file defines a base class, GitRepo, which uses straight-up calling git commands, and needs Git to be installed.

Usage:

Example usage:

from diffannotator.utils.git import GitRepo files = GitRepo('path/to/git/repo').list_files('HEAD') # 'HEAD' is the default ... ...

This implementation / backend retrieves data by calling git via subprocess.Popen or subprocess.run, and parsing the output.

WARNING: at the time this backend does not have error handling implemented; it would simply return empty result, without any notification about the error (like incorrect repository path, or incorrect commit)!!!

AuthorStat

Bases: NamedTuple

Parsed result of 'git shortlog -c -s'

Source code in src/diffannotator/utils/git.py
61
62
63
64
class AuthorStat(NamedTuple):
    """Parsed result of 'git shortlog -c -s'"""
    author: str  #: author name (commit authorship info)
    count: int = 0  #: number of commits per author

ChangeSet

Bases: PatchSet

Commit changes, together with commit data

Note that changeset can come from a commit, or from a diff between two different commits (tree-ish)

Source code in src/diffannotator/utils/git.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class ChangeSet(PatchSet):
    """Commit changes, together with commit data

    Note that changeset can come from a commit, or from a diff
    between two different commits (tree-ish)
    """
    RE_DIFF_GIT_HEADER_GENERIC = re.compile(
        pattern=r'^diff --git [^\t\n]+ [^\t\n]+',
        flags=re.MULTILINE
    )
    RE_ALL_SHA1_FULL = re.compile(r'^[0-9a-f]{40}$')
    # TODO: support SHA-256 object names
    # https://git-scm.com/docs/hash-function-transition

    def __init__(self, patch_source: Union[StringIO, TextIO, str], commit_id: str,
                 prev: Optional[str] = None, newline: Optional[str] = '\n',
                 *args, **kwargs):
        """ChangeSet class constructor

        :param patch_source: patch source to be parsed by PatchSet (parent class)
        :param commit_id: oid of the "after" commit (tree-ish) for the change
        :param prev: previous state, when ChangeSet is generated with .unidiff(),
            or `None` it the change corresponds to a commit (assumed first-parent)
        :param newline: determines how to parse newline characters from the stream,
            and how the stream was opened (one of None i.e. universal newline,
            '', '\n', '\r', and '\r\n' - same as `open`)
        :param args: passed to PatchSet constructor
        :param kwargs: passed to PatchSet constructor (recommended);
            PatchSet uses `encoding` (str) and `metadata_only` (bool): :raw-html:`<br />`
            if `encoding` is `None`, assume we are reading Unicode data,
            when `metadata_only` is `True`, only perform a minimal metadata parsing
            (i.e. hunks without content) which is around 2.5-6 times faster;
            it will still validate the diff metadata consistency and get counts
        """
        # with universal newline you don't need to do translation, because it is already done
        # with '\n' as newline you don't need to do translation, because it has correct EOLs
        # this means that `newline` can be '\r' or '\r\n'
        if newline is not None and newline != '\n':
            # TODO: create a proper wrapper around io.StringIO.readline()
            if isinstance(patch_source, StringIO):
                patch_source = patch_source.getvalue()
            elif isinstance(patch_source, TextIOWrapper):
                patch_source = patch_source.read()
            patch_source = StringIO(patch_source.replace(newline, '\n'))

        super().__init__(patch_source, *args, **kwargs)
        self.commit_id = commit_id
        self.prev = prev

        # retrieve commit metadata from patch, if possible
        self.commit_metadata: Optional[dict] = None
        if prev is None or prev.endswith("^"):
            if isinstance(patch_source, StringIO):
                patch_source.seek(0)
                patch_text = patch_source.getvalue()
            elif isinstance(patch_source, TextIOWrapper):
                patch_source.seek(0)
                patch_text = patch_source.read()
            else:
                patch_text = patch_source
            match = re.search(self.RE_DIFF_GIT_HEADER_GENERIC,
                              patch_text)
            if match:
                pos = match.start()
                commit_text = patch_text[:pos]
                # -1 is to remove newline from empty line separating commit text from diff
                self.commit_metadata = _parse_commit_text(commit_text[:-1],
                                                          with_parents_line=False)

    # override
    @classmethod
    def from_filename(cls, filename: Union[str, Path], encoding: str = DEFAULT_ENCODING,
                      errors: Optional[str] = ENCODING_ERRORS, newline: Optional[str] = '\n') -> 'ChangeSet':
        """Return a ChangeSet instance given a diff filename.

        :param filename: path to a diff file with the patch to parse
        :param encoding: the name of the encoding used to decode or encode
            the diff file
        :param errors: specifies how encoding and decoding errors are to be
            handled (one of None, 'strict', 'ignore', 'replace', 'backslashreplace',
            or 'surrogateescape' - same as `open`)
        :param newline: determines how to parse newline characters from the stream
            (one of None, '', '\n', '\r', and '\r\n' - same as `open`)
        :return: instance of ChangeSet class
        """
        # NOTE: unconditional `file_path = Path(filename)` would also work
        if isinstance(filename, Path):
            file_path = filename
        else:
            file_path = Path(filename)

        # try to extract commit_id from basename of the file, for example
        # from filename == 'e54746bdf7d5c831eabe4dcea76a7626f1de73df.diff'
        commit_id = ''
        base_name = file_path.stem
        if re.fullmatch(cls.RE_ALL_SHA1_FULL, base_name):
            commit_id = base_name

        # slightly modified contents of PatchSet.from_filename() alternate constructor
        with file_path.open(mode='r', encoding=encoding, errors=errors, newline=newline) as fp:
            obj = cls(fp, commit_id=commit_id, newline=newline)  # PatchSet.from_filename() has type mismatch

        # adjust commit_id if we were able to retrieve commit metadata from file
        if commit_id != '' and obj.commit_metadata is not None:
            obj.commit_id = obj.commit_metadata['id']

        return obj

__init__(patch_source, commit_id, prev=None, newline='\n', *args, **kwargs)

ChangeSet class constructor

    :param patch_source: patch source to be parsed by PatchSet (parent class)
    :param commit_id: oid of the "after" commit (tree-ish) for the change
    :param prev: previous state, when ChangeSet is generated with .unidiff(),
        or `None` it the change corresponds to a commit (assumed first-parent)
    :param newline: determines how to parse newline characters from the stream,
        and how the stream was opened (one of None i.e. universal newline,
        '', '

', ' ', and ' ' - same as open) :param args: passed to PatchSet constructor :param kwargs: passed to PatchSet constructor (recommended); PatchSet uses encoding (str) and metadata_only (bool): :raw-html:<br /> if encoding is None, assume we are reading Unicode data, when metadata_only is True, only perform a minimal metadata parsing (i.e. hunks without content) which is around 2.5-6 times faster; it will still validate the diff metadata consistency and get counts

Source code in src/diffannotator/utils/git.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, patch_source: Union[StringIO, TextIO, str], commit_id: str,
             prev: Optional[str] = None, newline: Optional[str] = '\n',
             *args, **kwargs):
    """ChangeSet class constructor

    :param patch_source: patch source to be parsed by PatchSet (parent class)
    :param commit_id: oid of the "after" commit (tree-ish) for the change
    :param prev: previous state, when ChangeSet is generated with .unidiff(),
        or `None` it the change corresponds to a commit (assumed first-parent)
    :param newline: determines how to parse newline characters from the stream,
        and how the stream was opened (one of None i.e. universal newline,
        '', '\n', '\r', and '\r\n' - same as `open`)
    :param args: passed to PatchSet constructor
    :param kwargs: passed to PatchSet constructor (recommended);
        PatchSet uses `encoding` (str) and `metadata_only` (bool): :raw-html:`<br />`
        if `encoding` is `None`, assume we are reading Unicode data,
        when `metadata_only` is `True`, only perform a minimal metadata parsing
        (i.e. hunks without content) which is around 2.5-6 times faster;
        it will still validate the diff metadata consistency and get counts
    """
    # with universal newline you don't need to do translation, because it is already done
    # with '\n' as newline you don't need to do translation, because it has correct EOLs
    # this means that `newline` can be '\r' or '\r\n'
    if newline is not None and newline != '\n':
        # TODO: create a proper wrapper around io.StringIO.readline()
        if isinstance(patch_source, StringIO):
            patch_source = patch_source.getvalue()
        elif isinstance(patch_source, TextIOWrapper):
            patch_source = patch_source.read()
        patch_source = StringIO(patch_source.replace(newline, '\n'))

    super().__init__(patch_source, *args, **kwargs)
    self.commit_id = commit_id
    self.prev = prev

    # retrieve commit metadata from patch, if possible
    self.commit_metadata: Optional[dict] = None
    if prev is None or prev.endswith("^"):
        if isinstance(patch_source, StringIO):
            patch_source.seek(0)
            patch_text = patch_source.getvalue()
        elif isinstance(patch_source, TextIOWrapper):
            patch_source.seek(0)
            patch_text = patch_source.read()
        else:
            patch_text = patch_source
        match = re.search(self.RE_DIFF_GIT_HEADER_GENERIC,
                          patch_text)
        if match:
            pos = match.start()
            commit_text = patch_text[:pos]
            # -1 is to remove newline from empty line separating commit text from diff
            self.commit_metadata = _parse_commit_text(commit_text[:-1],
                                                      with_parents_line=False)

from_filename(filename, encoding=DEFAULT_ENCODING, errors=ENCODING_ERRORS, newline='\n') classmethod

Return a ChangeSet instance given a diff filename.

    :param filename: path to a diff file with the patch to parse
    :param encoding: the name of the encoding used to decode or encode
        the diff file
    :param errors: specifies how encoding and decoding errors are to be
        handled (one of None, 'strict', 'ignore', 'replace', 'backslashreplace',
        or 'surrogateescape' - same as `open`)
    :param newline: determines how to parse newline characters from the stream
        (one of None, '', '

', ' ', and ' ' - same as open) :return: instance of ChangeSet class

Source code in src/diffannotator/utils/git.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def from_filename(cls, filename: Union[str, Path], encoding: str = DEFAULT_ENCODING,
                  errors: Optional[str] = ENCODING_ERRORS, newline: Optional[str] = '\n') -> 'ChangeSet':
    """Return a ChangeSet instance given a diff filename.

    :param filename: path to a diff file with the patch to parse
    :param encoding: the name of the encoding used to decode or encode
        the diff file
    :param errors: specifies how encoding and decoding errors are to be
        handled (one of None, 'strict', 'ignore', 'replace', 'backslashreplace',
        or 'surrogateescape' - same as `open`)
    :param newline: determines how to parse newline characters from the stream
        (one of None, '', '\n', '\r', and '\r\n' - same as `open`)
    :return: instance of ChangeSet class
    """
    # NOTE: unconditional `file_path = Path(filename)` would also work
    if isinstance(filename, Path):
        file_path = filename
    else:
        file_path = Path(filename)

    # try to extract commit_id from basename of the file, for example
    # from filename == 'e54746bdf7d5c831eabe4dcea76a7626f1de73df.diff'
    commit_id = ''
    base_name = file_path.stem
    if re.fullmatch(cls.RE_ALL_SHA1_FULL, base_name):
        commit_id = base_name

    # slightly modified contents of PatchSet.from_filename() alternate constructor
    with file_path.open(mode='r', encoding=encoding, errors=errors, newline=newline) as fp:
        obj = cls(fp, commit_id=commit_id, newline=newline)  # PatchSet.from_filename() has type mismatch

    # adjust commit_id if we were able to retrieve commit metadata from file
    if commit_id != '' and obj.commit_metadata is not None:
        obj.commit_id = obj.commit_metadata['id']

    return obj

DiffSide

Bases: Enum

Enum to be used for side parameter of GitRepo.list_changed_files

Source code in src/diffannotator/utils/git.py
46
47
48
49
50
51
class DiffSide(Enum):
    """Enum to be used for `side` parameter of `GitRepo.list_changed_files`"""
    PRE = 'pre'
    POST = 'post'
    A = 'pre'
    B = 'post'

GitRepo

Class representing Git repository, for performing operations on

Source code in src/diffannotator/utils/git.py
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
class GitRepo:
    """Class representing Git repository, for performing operations on"""
    path_encoding = 'utf8'
    default_file_encoding = 'utf8'
    log_encoding = 'utf8'
    fallback_encoding = 'latin1'  # must be 8-bit encoding
    encoding_errors = ENCODING_ERRORS
    # see 346245a1bb ("hard-code the empty tree object", 2008-02-13)
    # https://github.com/git/git/commit/346245a1bb6272dd370ba2f7b9bf86d3df5fed9a
    # https://github.com/git/git/commit/e1ccd7e2b1cae8d7dab4686cddbd923fb6c46953
    empty_tree_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'

    def __init__(self, repo_dir: PathLike):
        """Constructor for `GitRepo` class

        :param repo_dir: path to the Git repository
        """
        # TODO: check that `git_directory` is a path to git repository
        # TODO: remember absolute path (it is safer)
        self.repo = Path(repo_dir)

    def __repr__(self):
        class_name = type(self).__name__
        return f"{class_name}(repo_dir={self.repo!r})"

    def __str__(self):
        return f"{self.repo!s}"

    @classmethod
    def clone_repository(cls,
                         repository: PathLike,
                         directory: Optional[PathLike] = None,
                         working_dir: Optional[PathLike] = None,
                         reference_local_repository: Optional[PathLike] = None,
                         dissociate: bool = False,
                         make_path_absolute: bool = False) -> Union['GitRepo', None]:
        """Clone a repository into a new directory, return cloned GitRepo

        If there is non-empty directory preventing from cloning the repository,
        the method assumes that it is because the repository was already cloned;
        in this case it returns that directory as `GitRepo`.

        :param repository: The (possibly remote) repository to clone from,
            usually a URL (ssh, git, http, or https) or a local path.
        :param directory: The name of a new directory to clone into, optional.
            The "human-ish" part of the source repository is used if `directory`
            is not provided (if it is `None`).
        :param working_dir: The directory where to run the
            `git-clone https://git-scm.com/docs/git-clone` operation;
            otherwise current working directory is used.  The value
            of this parameter does not matter if `directory` is provided,
        :param reference_local_repository: Use `reference_local_repository`
            to avoid network transfer, and to reduce local storage costs
        :param dissociate: whether to dissociate with `reference_local_repository`,
            used only if `reference_local_repository` is not None
        :param make_path_absolute: Ensure that returned `GitRepo` uses absolute path
        :return: Cloned repository as `GitRepo` if operation was successful,
            otherwise `None`.
        """
        # TODO: make it @classmethod (to be able to use in constructor)
        def _to_repo_path(a_path: str) -> PathLike:
            if make_path_absolute:
                if Path(a_path).is_absolute():
                    return a_path
                else:
                    return Path(working_dir or '').joinpath(a_path).absolute()

            return a_path

        args = ['git']
        if working_dir is not None:
            args.extend(['-C', str(working_dir)])
        if reference_local_repository:
            args.extend([
                'clone', f'--reference-if-able={reference_local_repository}'
            ])
            if dissociate:
                args.append('--dissociate')
            args.append(repository)
        else:
            args.extend([
                'clone', repository
            ])
        if directory is not None:
            args.append(str(directory))

        # https://serverfault.com/questions/544156/git-clone-fail-instead-of-prompting-for-credentials
        env = {
            'GIT_TERMINAL_PROMPT': '0',
            'GIT_SSH_COMMAND': 'ssh -oBatchMode=yes',
            'GIT_ASKPASS': 'echo',
            'SSH_ASKPASS': 'echo',
            'GCM_INTERACTIVE': 'never',
        }

        result = subprocess.run(args, capture_output=True, env=env)
        if result.returncode == 128:
            # TODO: log a warning about the problem
            #print(f"{result.stderr=}")
            # try again without environment variables, in case of firewall problem like
            # fatal: unable to access 'https://github.com/githubtraining/hellogitworld.git/':
            #        getaddrinfo() thread failed to start
            result = subprocess.run(args, capture_output=True)

        # we are interested only in the directory where the repository was cloned into
        # that's why we are using GitRepo.path_encoding (instead of 'utf8', for example)

        if result.returncode == 128:
            # repository was already cloned
            for line in result.stderr.decode(GitRepo.path_encoding).splitlines():
                match = re.match(r"fatal: destination path '(.*)' already exists and is not an empty directory.", line)
                if match:
                    return GitRepo(_to_repo_path(match.group(1)))

            # could not find where repository is
            return None

        elif result.returncode != 0:
            # other error
            return None

        for line in result.stderr.decode(GitRepo.path_encoding).splitlines():
            match = re.match(r"Cloning into '(.*)'...", line)
            if match:
                return GitRepo(_to_repo_path(match.group(1)))

        return None

    def format_patch(self,
                     output_dir: Optional[PathLike] = None,
                     revision_range: Union[str, Iterable[str]] = ('-1', 'HEAD')) -> str:
        """Generate patches out of specified revisions, saving them as individual files

        :param output_dir: output directory for patches; if not set (the default),
            save patches in the current working directory
        :param revision_range: arguments to pass to `git format-patch`, see
            https://git-scm.com/docs/git-format-patch; by default generates single patch
            from the HEAD
        :return: output from the `git format-patch` process
        """
        # NOTE: it should be ':param \*args' or ':param \\*args', but for the bug in PyCharm
        cmd = [
            'git', '-C', str(self.repo),
            'format-patch'
        ]
        if output_dir is not None:
            cmd.extend([
                '--output-directory', str(output_dir)
            ])
        if isinstance(revision_range, str):
            cmd.append(revision_range)
        else:
            cmd.extend(revision_range)

        # NOTE: specifying `encoding` or `errors` turns on `text` == `universal_newlines`
        # and you cannot say `text=False` and/or `universal_newlines=False` to turn it off
        # The output of the `git format-patch` command can contain embedded `\r` (CR)
        process = subprocess.run(cmd,
                                 capture_output=True, check=True)
        # MAYBE: better checks for process.returncode, and examine process.stderr
        if process.returncode == 0:
            return process.stdout.decode(encoding='utf-8', errors=self.encoding_errors)
        else:
            return process.stderr.decode(encoding='utf-8', errors=self.encoding_errors)

    def list_files(self, commit: str = 'HEAD') -> list[str]:
        """Retrieve list of files at given revision in a repository

        :param str commit:
            The commit for which to list all files.  Defaults to 'HEAD',
            that is the current commit
        :return: List of full path names of all files in the repository.
        :rtype: list[str]
        """
        args = [
            'git', '-C', str(self.repo), 'ls-tree',
            '-r', '--name-only', '--full-tree', '-z',
            commit
        ]
        # TODO: consider replacing with subprocess.run()
        process = subprocess.Popen(args, stdout=subprocess.PIPE)
        result = process.stdout.read() \
                     .decode(GitRepo.path_encoding) \
                     .split('\0')[:-1]
        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running
        # TODO: add error checking
        return result

    def list_changed_files(self, commit: str = 'HEAD',
                           side: DiffSide = DiffSide.POST) -> list[str]:
        """Retrieve list of files changed at given revision in repo

        NOTE: not tested for merge commits, especially "evil merges"
        with respect to file names.

        :param str commit:
            The commit for which to list changes.  Defaults to 'HEAD',
            that is the current commit.  The changes are relative to
            commit^, that is the previous commit (first parent of the
            given commit).

        :param DiffSide side:
            Whether to use names of files in post-image (after changes)
            with side=DiffSide.POST, or pre-image names (before changes)
            with side=DiffSide.PRE.  Renames are detected by Git.

        :return: full path names of files changed in `commit`.
        :rtype: list[str]
        """
        if side == DiffSide.PRE:
            changes_status = self.diff_file_status(commit)
            return [
                pre for (pre, _) in changes_status.keys()
                if pre is not None  # TODO: check how deleted files work with side=DiffSide.POST
            ]

        if side != DiffSide.POST:
            raise NotImplementedError(f"GitRepo.list_changed_files: unsupported side={side} parameter")

        # --no-commit-id is needed for 1-argument git-diff-tree
        cmd = [
            'git', '-C', self.repo, 'diff-tree', '-M',
            '-r', '--name-only', '--no-commit-id', '-z',
            commit
        ]
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        result = process.stdout.read() \
                     .decode(GitRepo.path_encoding) \
                     .split('\0')[:-1]
        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

        return result

    def diff_file_status(self, commit: str = 'HEAD',
                         prev: Optional[str] = None) -> dict[tuple[str, str], str]:
        """Retrieve status of file changes at given revision in repo

        It returns in a structured way information equivalent to the one
        from calling 'git diff --name-status -r'.

        Example output:
            {
                (None, 'added_file'): 'A',
                ('file_to_be_deleted', None): 'D',
                ('mode_changed', 'mode_changed'): 'M',
                ('modified', 'modified'): 'M',
                ('to_be_renamed', 'renamed'): 'R'
            }

        :param commit: The commit for which to list changes for.
            Defaults to 'HEAD', that is the current commit.
        :type: str
        :param prev: The commit for which to list changes from.
            If not set, then changes are relative to the parent of
            the `commit` parameter, which means 'commit^'.
        :type: str or None
        :return: Information about the status of each change.
            Returns a mapping (a dictionary), where the key is the pair (tuple)
            of pre-image and post-image pathname, and the value is a
            single letter denoting the status / type of the change.

            For new (added) files the pre-image path is `None`, and for deleted
            files the post-image path is `None`.

            Possible status letters are:
             - 'A': addition of a file,
             - 'C': copy of a file into a new one (not for all implementations),
             - 'D': deletion of a file,
             - 'M': modification of the contents or mode of a file,
             - 'R': renaming of a file,
             - 'T': change in the type of the file (untested).

        :rtype: dict[tuple[str,str],str]
        """
        if prev is None:
            # NOTE: this means first-parent changes for merge commits
            prev = commit + '^'

        cmd = [
            'git', '-C', self.repo, 'diff-tree', '--no-commit-id',
            # turn on renames [with '-M' or '-C'];
            # note that parsing is a bit easier without '-z', assuming that filenames are sane
            # increase inexact rename detection limit
            '--find-renames', '-l5000', '--name-status', '-r',
            prev, commit
        ]
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        lines = process.stdout.read().decode(GitRepo.path_encoding).splitlines()
        result = {}
        for line in lines:
            if line[0] == 'R' or line[0] == 'C':
                status, old, new = line.split("\t")
                result[(old, new)] = status[0]  # no similarity info
            else:
                status, path = line.split("\t")
                if status == 'A':
                    result[(None, path)] = status
                elif status == 'D':
                    result[(path, None)] = status
                else:
                    result[(path, path)] = status

        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

        return result

    def changed_lines_extents(self, commit: str = 'HEAD',
                              prev: Optional[str] = None,
                              side: DiffSide = DiffSide.POST) -> tuple[dict[str, list[tuple[int, int]]],
                                                                       dict[str, list[PatchLine]]]:
        """List target line numbers of changed files as extents, for each changed file

        For each changed file that appears in `side` side of the diff between
        given commits, it returns list of `side` line numbers (e.g. target line
        numbers for post=DiffSide.POST).

        Line numbers are returned compressed as extents, that is list of
        tuples of start and end range.  For example, if target line numbers
        would be [1, 2, 3, 7, 10, 11], then their extent list would be
        [(1, 3), (7, 7), (10, 11)].

        To make it easier to mesh with other parts of computation, and to
        avoid reparsing diffs, also return parsed patch lines (diff lines).

        Uses :func:`GitRepo.unidiff` to parse git diff between `prev` and `commit`.

        Used by :func:`GitRepo.changes_survival`.

        :param str commit: later (second) of two commits to compare,
            defaults to 'HEAD', that is the current commit
        :param str or None prev: earlier (first) of two commits to compare,
            defaults to None, which means comparing to parent of `commit`
        :param DiffSide side: Whether to use names of files in post-image (after changes)
            with side=DiffSide.POST, or pre-image names (before changes)
            with side=DiffSide.PRE.  Renames are detected by Git.
            Defaults to DiffSide.POST, which is currently the only value
            supported.
        :return: two dicts, with changed files names as keys,
            first with information about change lines extents,
            second with parsed change lines (only for added lines)
        :rtype: (dict[str, list[tuple[int, int]]], dict[str, list[PatchLine]])
        """
        # TODO: implement also for DiffSide.PRE
        if side != DiffSide.POST:
            raise NotImplementedError(f"GitRepo.changed_lines_extents: unsupported side={side} parameter")

        patch = self.unidiff(commit=commit, prev=prev)
        file_ranges = {}
        file_diff_lines_added = defaultdict(list)
        for patched_file in patch:
            if patched_file.is_removed_file:  # no post-image for removed files
                continue
            line_ranges = []
            for hunk in patched_file:
                (range_beg, range_end) = (None, None)
                for line in hunk:
                    # we are interested only in ranges of added lines (in post-image)
                    if line.is_added:
                        if range_beg is None:  # first added line in line range
                            range_beg = line.target_line_no
                        range_end = line.target_line_no

                        file_diff_lines_added[patched_file.path].append(
                            line
                        )

                    else:  # deleted line, context line, or "No newline at end of file" line
                        if range_beg is not None:
                            line_ranges.append((range_beg, range_end))
                            range_beg = None

                # if diff ends with added line
                if range_beg is not None:
                    line_ranges.append((range_beg, range_end))

            file_ranges[patched_file.path] = line_ranges

        return file_ranges, file_diff_lines_added

    @overload
    def unidiff(self, commit: str = ..., prev: Optional[str] = ..., wrap: Literal[True] = ...) -> ChangeSet:
        ...

    @overload
    def unidiff(self, commit: str = ..., prev: Optional[str] = ..., *, wrap: Literal[False]) -> Union[str, bytes]:
        ...

    @overload
    def unidiff(self, commit: str = ..., prev: Optional[str] = ..., wrap: bool = ...) -> Union[str, bytes, ChangeSet]:
        ...

    def unidiff(self, commit='HEAD', prev=None, wrap=True):
        """Return unified diff between `commit` and `prev`

        If `prev` is None (which is the default), return diff between the
        `commit` and its first parent, or between the `commit` and the empty
        tree if `commit` does not have any parents (if it is a root commit).

        If `wrap` is True (which is the default), wrap the result in
        unidiff.PatchSet to make it easier to extract information from
        the diff.  Otherwise, return diff as plain text.

        :param str commit: later (second) of two commits to compare,
            defaults to 'HEAD', that is the current commit
        :param prev: earlier (first) of two commits to compare,
            defaults to None, which means comparing to parent of `commit`
        :type prev: str or None
        :param bool wrap: whether to wrap the result in PatchSet
        :return: the changes between two arbitrary commits,
            `prev` and `commit`
        :rtype: str or bytes or ChangeSet
        """
        if prev is None:
            try:
                # NOTE: this means first-parent changes for merge commits
                return self.unidiff(commit=commit, prev=commit + '^', wrap=wrap)
            except subprocess.CalledProcessError:
                # commit^ does not exist for a root commits (for first commits)
                return self.unidiff(commit=commit, prev=self.empty_tree_sha1, wrap=wrap)

        cmd = [
            'git', '-C', self.repo,
            'diff', '--find-renames', '--find-copies', '--find-copies-harder',
            prev, commit
        ]
        process = subprocess.run(cmd,
                                 capture_output=True, check=True)
        try:
            diff_output = process.stdout.decode(self.default_file_encoding)
        except UnicodeDecodeError:
            # unidiff.PatchSet can only handle strings
            diff_output = process.stdout.decode(self.fallback_encoding)

        if wrap:
            return ChangeSet(diff_output, self.to_oid(commit),
                             prev=prev)
        else:
            return diff_output

    @overload
    def log_p(self, revision_range: Union[str, Iterable[str]] = ..., wrap: Literal[True] = ...) \
            -> Iterator[ChangeSet]:
        ...

    @overload
    def log_p(self, revision_range: Union[str, Iterable[str]] = ..., wrap: Literal[False] = ...) \
            -> Iterator[str]:
        ...

    @overload
    def log_p(self, revision_range: Union[str, Iterable[str]] = ..., wrap: bool = ...) \
            -> Union[Iterator[str], Iterator[ChangeSet]]:
        ...

    def log_p(self, revision_range=('-1', 'HEAD'), wrap=True):
        """Generate commits with unified diffs for a given `revision_range`

        If `revision_range` is not provided, it generates single most recent
        commit on the current branch.

        The `wrap` parameter controls the output format.  If true (the
        default), generate series of `unidiff.PatchSet` for commits changes.
        If false, generate series of raw commit + unified diff of commit
        changes (as `str`).  This is similar to how `unidiff()` method works.

        :param revision_range: arguments to pass to `git log --patch`, see
            https://git-scm.com/docs/git-log; by default generates single patch
            from the HEAD
        :param wrap: whether to wrap the result in PatchSet
        :return: the changes for given `revision_range`
        """
        def commit_with_patch(_commit_id: str, _commit_data: StringIO) -> ChangeSet:
            """Helper to create ChangeSet with from _commit_data stream"""
            _commit_data.seek(0)  # rewind to beginning for reading by the PatchSet constructor
            return ChangeSet(_commit_data, _commit_id)

        cmd = [
            'git', '-C', str(self.repo),
            # NOTE: `git rev-list` does not support --patch option
            'log', '--format=raw', '--diff-merges=first-parent', '--patch', '-z',  # log options
            '--find-renames', '--find-copies', '--find-copies-harder',  # diff options
        ]
        if isinstance(revision_range, str):
            cmd.append(revision_range)
        else:
            cmd.extend(revision_range)

        ## DEBUG (TODO: switch to logger.debug())
        #print(f"{cmd=}")

        # NOTE: specifying `encoding` or `errors` turns on `text` == `universal_newlines`
        # and you cannot say `text=False` and/or `universal_newlines=False` to turn it off
        # The output of the `git log -p` command can contain embedded `\r` (CR)
        process = subprocess.Popen(
            cmd,
            #bufsize=1,  # line buffered
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,  # TODO: consider capturing stderr
        )

        commit_data = StringIO()
        commit_id: Optional[str] = None
        # if we are waiting on the process, readline can return empty line
        # if the process ends, we can still have lines in buffer
        while (log_p_line_raw := process.stdout.readline()) or process.poll() is None:
            log_p_line = log_p_line_raw.decode(
                encoding='utf-8',
                errors=self.encoding_errors,
            )
            if log_p_line:
                if not commit_id and log_p_line[0] != '\0':
                    # first line in output
                    commit_id = log_p_line.strip()[7:]  # strip "commit "

                if log_p_line[0] == '\0':
                    # end of old commit, start of new commit
                    ## DEBUG (TODO: switch to logger.debug())
                    #print(f"new commit: {log_p_line[1:]}", end="")
                    # return old commit data
                    if wrap:
                        yield commit_with_patch(commit_id, commit_data)
                    else:
                        yield commit_data.getvalue()
                    # start gathering data for a new commit
                    commit_data.truncate(0)
                    # strip the '\0' separator
                    log_p_line = log_p_line[1:]
                    commit_id = log_p_line.strip()[7:]  # strip "commit "

                # gather next line of commit data
                commit_data.write(log_p_line)

        if commit_data.tell() > 0:
            # there is gathered data from the last commit
            ## DEBUG (TODO: switch to logger.debug())
            #print("last commit")
            if wrap:
                yield commit_with_patch(commit_id, commit_data)
            else:
                yield commit_data.getvalue()

        return_code = process.wait()
        if return_code != 0:
            print(f"Error running 'git log' for {self.repo.name} repo, error code = {return_code}")
            print(f"- repository path: '{self.repo}'")

    def _file_contents_process(self, commit: str, path: str) -> subprocess.Popen[bytes]:
        cmd = [
            'git', '-C', self.repo, 'show',  # or 'git', '-C', self.repo, 'cat-file', 'blob',
            # assumed that 'commit' is sane
            f'{commit}:{path}'
        ]
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE)

        return process

    def file_contents(self, commit: str, path: str, encoding: Optional[str] = None) -> str:
        """Retrieve contents of given file at given revision / tree

        :param str commit: The commit for which to return file contents.
        :param str path: Path to a file, relative to the top-level of the repository
        :param encoding: Encoding of the file (optional)
        :type: str or None
        :return: Contents of the file with given path at given revision
        :rtype: str
        """
        if encoding is None:
            encoding = GitRepo.default_file_encoding

        process = self._file_contents_process(commit, path)
        result = process.stdout.read().decode(encoding=encoding, errors=self.encoding_errors)
        # NOTE: does not handle errors correctly yet
        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

        return result

    @contextmanager
    def open_file(self, commit: str, path: str) -> BufferedReader:
        """Open given file at given revision / tree as binary file

        Works as a context manager, like `pathlib.Path.open()`:
            >>> with GitRepo('/path/to/repo').open_file('v1', 'example_file') as fpb:
            ...     contents = fpb.read().decode('utf8')
            ...

        :param str commit: The commit for which to return file contents.
        :param str path: Path to a file, relative to the top-level of the repository
        :return: file object, opened in binary mode
        :rtype: io.BufferedReader
        """
        process = self._file_contents_process(commit, path)
        try:
            yield process.stdout
        finally:
            # NOTE: does not handle errors correctly yet
            process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
            process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    def list_tags(self) -> list[str]:
        """Retrieve list of all tags in the repository

        :return: List of all tags in the repository.
        :rtype: list[str]
        """
        cmd = ['git', '-C', self.repo, 'tag', '--list']
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        # NOTE: f.readlines() might be not the best solution
        tags = [line.decode(GitRepo.path_encoding).rstrip()
                for line in process.stdout.readlines()]

        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

        return tags

    def create_tag(self, tag_name: str, commit: str = 'HEAD') -> None:
        """Create lightweight tag (refs/tags/* ref) to the given commit

        NOTE: does not support annotated tags for now; among others it
        would require deciding on tagger identity (at least for some
        backends).

        :param str tag_name: Name of tag to be created.
            Should follow `git check-ref-format` rules for name;
            see https://git-scm.com/docs/git-check-ref-format ;
            for example they cannot contain space ' ', tilde '~', caret '^',
            or colon ':'.  Those rules are NOT checked.
        :param str commit: Revision to be tagged.  Defaults to 'HEAD'.
        :rtype: None
        """
        cmd = [
            'git', '-C', self.repo, 'tag', tag_name, commit,
        ]
        # we are interested in effects of the command, not its output
        subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

    def get_commit_metadata(self, commit: str = 'HEAD') -> dict[str, Union[str, dict, list]]:
        """Retrieve metadata about given commit

        :param str commit: The commit to examine.
            Defaults to 'HEAD', that is the current (most recent) commit.
        :return: Information about selected parts of commit metadata,
            the following format:

            {
                'id': 'f8ffd4067d1f1b902ae06c52db4867f57a424f38',
                'parents': ['fe4a622e5202cd990c8ec853d56e25922f263243'],
                'tree': '5347fe7b8606e7a164ab5cd355ee5d86c99796c0'
                'author': {
                    'author': 'A U Thor <author@example.com>',
                    'name': 'A U Thor',
                    'email': 'author@example.com',
                    'timestamp': 1112912053,
                    'tz_info': '-0600',
                },
                'committer': {
                    'committer': 'C O Mitter <committer@example.com>'
                    'name': 'C O Mitter',
                    'email': 'committer@example.com',
                    'timestamp': 1693598847,
                    'tz_info': '+0200',
                },
                'message': 'Commit summary\n\nOptional longer description\n',
            }

            TODO: use dataclass for result (for computed fields)

        :rtype: dict
        """
        # NOTE: using low level git 'plumbing' command means 'utf8' encoding is not assured
        # same as in `parse_commit` in gitweb/gitweb.perl in https://github.com/git/git
        # https://github.com/git/git/blob/3525f1dbc18ae36ca9c671e807d6aac2ac432600/gitweb/gitweb.perl#L3591C5-L3591C17
        cmd = [
            'git', '-C', self.repo, 'rev-list',
            '--parents', '--header', '--max-count=1', commit,
            '--'
        ]
        process = subprocess.run(cmd, capture_output=True, check=True)
        return _parse_commit_text(
            process.stdout.decode(GitRepo.log_encoding, errors=self.encoding_errors),
            # next parameters depend on the git command used
            with_parents_line=True, indented_body=True
        )

    def find_commit_by_timestamp(self, timestamp: Union[str, int], start_commit: str = 'HEAD') -> str:
        """Find first commit in repository older than given date

        :param timestamp: Date in UNIX epoch format, also known as timestamp format.
            Returned commit would be older than this date.
        :type: int or str
        :param str start_commit: The commit from which to start walking through commits,
            trying to find the one we want.  Defaults to 'HEAD'
        :return: Full SHA-1 identifier of found commit.

            WARNING: there is currently no support for error handling,
            among others for not finding any commit that fulfills
            the condition.  At least it is not tested.

        :rtype: str
        """
        cmd = [
            'git', '-C', self.repo, 'rev-list',
            f'--min-age={timestamp}', '-1',
            start_commit
        ]
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        # this should be US-ASCII hexadecimal identifier
        result = process.stdout.read().decode('latin1').strip()
        # NOTE: does not handle errors correctly yet

        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

        return result

    def to_oid(self, obj: str) -> Union[str, None]:
        """Convert object reference to object identifier

        Returns None if object `obj` is not present in the repository

        :param str obj: object reference, for example "HEAD" or "main^",
            see e.g. https://git-scm.com/docs/gitrevisions
        :return: SHA-1 identifier of object, or None if object is not found
        :rtype: str or None
        """
        cmd = [
            'git', '-C', self.repo,
            'rev-parse', '--verify', '--end-of-options', obj
        ]
        try:
            # emits SHA-1 identifier if object is found in the repo; otherwise, errors out
            process = subprocess.run(cmd, capture_output=True, check=True)
        except subprocess.CalledProcessError:
            return None

        # SHA-1 is ASCII only
        return process.stdout.decode('latin1').strip()

    def is_valid_commit(self, commit: str) -> bool:
        """Check if `commit` is present in the repository as a commit

        :param str commit: reference to a commit, for example "HEAD" or "main",
            or "fc6db4e600d633d6fc206217e70641bbb78cbc53^"
        :return: whether `commit` is a valid commit in repo
        :rtype: bool
        """
        return self.to_oid(str(commit) + '^{commit}') is not None

    def get_current_branch(self) -> Union[str, None]:
        """Return short name of the current branch

        It returns name of the branch, e.g. "main", rather than fully
        qualified name (full name), e.g. "refs/heads/main".

        Will return None if there is no current branch, that is if
        repo is in the 'detached HEAD' state.

        :return: name of the current branch
        :rtype: str or None
        """
        cmd = [
            'git', '-C', self.repo,
            'symbolic-ref', '--quiet', '--short', 'HEAD'
        ]
        try:
            # Using '--quiet' means that the command would not issue an error message
            # but exit with non-zero status silently if HEAD is not a symbolic ref, but detached HEAD
            process = subprocess.run(cmd,
                                     capture_output=True, check=True,
                                     # branch names cannot contain '\r' (CR) character,
                                     # see https://git-scm.com/docs/git-check-ref-format
                                     text=True, errors=self.encoding_errors)
        except subprocess.CalledProcessError:
            return None

        return process.stdout.strip()

    def resolve_symbolic_ref(self, ref: str = 'HEAD') -> Union[str, None]:
        """Return full name of reference `ref` symbolic ref points to

        If `ref` is not symbolic reference (e.g. ref='HEAD' and detached
        HEAD state) it returns None.

        :param str ref: name of the symbolic reference, e.g. "HEAD"
        :return: resolved `ref`
        :rtype: str or None
        """
        cmd = [
            'git', '-C', self.repo,
            'symbolic-ref', '--quiet', str(ref)
        ]
        try:
            # Using '--quiet' means that the command would not issue an error message
            # but exit with non-zero status silently if `ref` is not a symbolic ref
            process = subprocess.run(cmd,
                                     capture_output=True, check=True,
                                     # branch names and symbolic refereces cannot contain '\r' (CR),
                                     # see https://git-scm.com/docs/git-check-ref-format
                                     text=True, errors=self.encoding_errors)
        except subprocess.CalledProcessError:
            return None

        return process.stdout.strip()

    def _to_refs_list(self, ref_pattern: Union[str, list[str]] = 'HEAD') -> list[str]:
        # support single patter or list of patterns
        # TODO: use variable number of parameters instead (?)
        if not isinstance(ref_pattern, list):
            ref_pattern = [ref_pattern]

        return filter(
            # filter out cases of detached HEAD, resolved to None (no branch)
            lambda x: x is not None,
            map(
                # resolve symbolic references, currently only 'HEAD' is resolved
                lambda x: x if x != 'HEAD' else self.resolve_symbolic_ref(x),
                ref_pattern
            )
        )

    # TODO?: change name to `list_merged_into`
    def check_merged_into(self, commit: str, ref_pattern: Union[str, list[str]] = 'HEAD') -> list[str]:
        """List those refs among `ref_pattern` that contain given `commit`

        This method can be used to check if a given `commit` is merged into
        at least one ref matching `ref_pattern` using 'git for-each-ref --contains',
        see https://git-scm.com/docs/git-for-each-ref

        Return list of refs that contain given commit, or in other words
        list of refs that given commit is merged into.

        Note that symbolic refs, such as 'HEAD', are expanded.

        :param str commit: The commit to check if it is merged
        :param ref_pattern: <pattern>…, that is a pattern or list of patterns;
            check each ref that match against at least one patterns, either using
            fnmatch(3) or literally, in the latter case matching completely,
            or from the beginning up to a slash.  Defaults to 'HEAD'.
        :type ref_pattern: str or list[str]
        :return: list of refs matching `ref_pattern` that `commit` is merged into
            (that contain given `commit`)
        :rtype: list[str]
        """
        ref_pattern = self._to_refs_list(ref_pattern)

        cmd = [
            'git', '-C', self.repo,
            'for-each-ref', f'--contains={commit}',  # only list refs which contain the specified commit
            '--format=%(refname)',  # we only need list of refs that fulfill the condition mentioned above
            *ref_pattern
        ]
        process = subprocess.run(cmd,
                                 capture_output=True, check=True,
                                 # branch and other refs names cannot contain '\r' (CR),
                                 # see https://git-scm.com/docs/git-check-ref-format
                                 text=True, errors=self.encoding_errors)
        return process.stdout.splitlines()

    def count_commits(self,
                      start_from: str = StartLogFrom.CURRENT,
                      until_commit: str = None,
                      first_parent: bool = False) -> int:
        """Count number of commits in the repository

        Starting from `start_from`, count number of commits, stopping
        at `until_commit` if provided.

        If `first_parent` is set to True, makes Git follow only the first
        parent commit upon seeing a merge commit.

        :param start_from: where to start from to follow 'parent' links
        :type start_from: str or StartLogFrom
        :param until_commit: where to stop following 'parent' links;
            also ensures that we follow ancestry path to it, optional
        :type until_commit: str or None
        :param bool first_parent: follow only the first parent commit
            upon seeing a merge commit
        :return: number of commits
        :rtype: int
        """
        if hasattr(start_from, 'value'):
            start_from = start_from.value
        cmd = [
            'git', '-C', self.repo,
            'rev-list', '--count', str(start_from),
        ]
        if until_commit is not None:
            cmd.extend(['--not', until_commit, f'--ancestry-path={until_commit}', '--boundary'])
        if first_parent:
            cmd.append('--first-parent')
        process = subprocess.run(cmd,
                                 capture_output=True, check=True,
                                 # `git rev-list --count <start>` returns a number, no '\r' possible
                                 encoding='utf-8', errors=self.encoding_errors)

        return int(process.stdout)

    def list_authors_shortlog(self, start_from: str = StartLogFrom.ALL) -> list[Union[str, bytes]]:
        """List all authors using git-shortlog

        Summarizes the history of the project by providing list of authors
        together with their commit counts.  Uses `git shortlog --summary`
        internally.

        :param start_from: where to start from to follow 'parent' links
        :type start_from: str or StartLogFrom
        :return: list of authors together with their commit count,
            in the 'SPACE* <count> TAB <author>' format
        :rtype: list[str|bytes]
        """
        if hasattr(start_from, 'value'):
            start_from = start_from.value
        elif start_from is None:
            start_from = '--all'
        cmd = [
            'git', '-C', self.repo,
            'shortlog',
            '--summary',  # Suppress commit description and provide a commit count summary only.
            '-n',  # Sort output according to the number of commits per author
            start_from,
        ]
        process = subprocess.run(cmd, capture_output=True, check=True)
        try:
            # try to return text
            return process.stdout.decode(GitRepo.log_encoding, errors=self.encoding_errors).splitlines()
        except UnicodeDecodeError:
            # if not possible, return bytes
            return process.stdout.splitlines()

    def find_roots(self, start_from: str = StartLogFrom.CURRENT) -> list[str]:
        """Find root commits (commits without parents), starting from `start_from`

        :param start_from: where to start from to follow 'parent' links
        :type start_from: str or StartLogFrom
        :return: list of root commits, as SHA-1
        :rtype: list[str]
        """
        if hasattr(start_from, 'value'):
            start_from = start_from.value
        elif start_from is None:
            start_from = 'HEAD'

        cmd = [
            'git', '-C', self.repo,
            'rev-list', '--max-parents=0',  # gives all root commits
            str(start_from),
        ]
        process = subprocess.run(cmd,
                                 capture_output=True, check=True,
                                 # the Git command above returns list of commit identifiers
                                 # separated by newlines, therefore no '\r' in output possible
                                 text=True, errors=self.encoding_errors)
        return process.stdout.splitlines()

    def get_config(self, name: str, value_type: Optional[str] = None) -> Union[str, None]:
        """Query specific git config option

        If there is no Git configuration variable named `name`,
        then it returns None.

        :param str name: name of configuration option, for example
            'remote.origin.url' or 'user.name'
        :param value_type: name of git type to canonicalize outgoing value,
            see https://git-scm.com/docs/git-config#Documentation/git-config.txt---typelttypegt
            optional
        :type value_type: Literal['bool', 'int', 'bool-or-int', 'path', 'expiry-date', 'color'] or None
        :return: value of requested git configuration variable
        :rtype: str or None
        """
        cmd = [
            'git', '-C', self.repo,
            'config', str(name)
        ]
        if value_type is not None:
            cmd.append(f"--type={value_type}")

        try:
            process = subprocess.run(cmd,
                                     capture_output=True, check=True)
            return process.stdout.decode(errors=self.encoding_errors).strip()

        except subprocess.CalledProcessError as err:
            # This command will fail with non-zero status upon error. Some exit codes are:
            # - The section or key is invalid (ret=1),
            # - ...
            if err.returncode == 1:
                return None
            else:
                raise err

__init__(repo_dir)

Constructor for GitRepo class

:param repo_dir: path to the Git repository

Source code in src/diffannotator/utils/git.py
465
466
467
468
469
470
471
472
def __init__(self, repo_dir: PathLike):
    """Constructor for `GitRepo` class

    :param repo_dir: path to the Git repository
    """
    # TODO: check that `git_directory` is a path to git repository
    # TODO: remember absolute path (it is safer)
    self.repo = Path(repo_dir)

changed_lines_extents(commit='HEAD', prev=None, side=DiffSide.POST)

List target line numbers of changed files as extents, for each changed file

For each changed file that appears in side side of the diff between given commits, it returns list of side line numbers (e.g. target line numbers for post=DiffSide.POST).

Line numbers are returned compressed as extents, that is list of tuples of start and end range. For example, if target line numbers would be [1, 2, 3, 7, 10, 11], then their extent list would be [(1, 3), (7, 7), (10, 11)].

To make it easier to mesh with other parts of computation, and to avoid reparsing diffs, also return parsed patch lines (diff lines).

Uses :func:GitRepo.unidiff to parse git diff between prev and commit.

Used by :func:GitRepo.changes_survival.

:param str commit: later (second) of two commits to compare, defaults to 'HEAD', that is the current commit :param str or None prev: earlier (first) of two commits to compare, defaults to None, which means comparing to parent of commit :param DiffSide side: Whether to use names of files in post-image (after changes) with side=DiffSide.POST, or pre-image names (before changes) with side=DiffSide.PRE. Renames are detected by Git. Defaults to DiffSide.POST, which is currently the only value supported. :return: two dicts, with changed files names as keys, first with information about change lines extents, second with parsed change lines (only for added lines) :rtype: (dict[str, list[tuple[int, int]]], dict[str, list[PatchLine]])

Source code in src/diffannotator/utils/git.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
def changed_lines_extents(self, commit: str = 'HEAD',
                          prev: Optional[str] = None,
                          side: DiffSide = DiffSide.POST) -> tuple[dict[str, list[tuple[int, int]]],
                                                                   dict[str, list[PatchLine]]]:
    """List target line numbers of changed files as extents, for each changed file

    For each changed file that appears in `side` side of the diff between
    given commits, it returns list of `side` line numbers (e.g. target line
    numbers for post=DiffSide.POST).

    Line numbers are returned compressed as extents, that is list of
    tuples of start and end range.  For example, if target line numbers
    would be [1, 2, 3, 7, 10, 11], then their extent list would be
    [(1, 3), (7, 7), (10, 11)].

    To make it easier to mesh with other parts of computation, and to
    avoid reparsing diffs, also return parsed patch lines (diff lines).

    Uses :func:`GitRepo.unidiff` to parse git diff between `prev` and `commit`.

    Used by :func:`GitRepo.changes_survival`.

    :param str commit: later (second) of two commits to compare,
        defaults to 'HEAD', that is the current commit
    :param str or None prev: earlier (first) of two commits to compare,
        defaults to None, which means comparing to parent of `commit`
    :param DiffSide side: Whether to use names of files in post-image (after changes)
        with side=DiffSide.POST, or pre-image names (before changes)
        with side=DiffSide.PRE.  Renames are detected by Git.
        Defaults to DiffSide.POST, which is currently the only value
        supported.
    :return: two dicts, with changed files names as keys,
        first with information about change lines extents,
        second with parsed change lines (only for added lines)
    :rtype: (dict[str, list[tuple[int, int]]], dict[str, list[PatchLine]])
    """
    # TODO: implement also for DiffSide.PRE
    if side != DiffSide.POST:
        raise NotImplementedError(f"GitRepo.changed_lines_extents: unsupported side={side} parameter")

    patch = self.unidiff(commit=commit, prev=prev)
    file_ranges = {}
    file_diff_lines_added = defaultdict(list)
    for patched_file in patch:
        if patched_file.is_removed_file:  # no post-image for removed files
            continue
        line_ranges = []
        for hunk in patched_file:
            (range_beg, range_end) = (None, None)
            for line in hunk:
                # we are interested only in ranges of added lines (in post-image)
                if line.is_added:
                    if range_beg is None:  # first added line in line range
                        range_beg = line.target_line_no
                    range_end = line.target_line_no

                    file_diff_lines_added[patched_file.path].append(
                        line
                    )

                else:  # deleted line, context line, or "No newline at end of file" line
                    if range_beg is not None:
                        line_ranges.append((range_beg, range_end))
                        range_beg = None

            # if diff ends with added line
            if range_beg is not None:
                line_ranges.append((range_beg, range_end))

        file_ranges[patched_file.path] = line_ranges

    return file_ranges, file_diff_lines_added

check_merged_into(commit, ref_pattern='HEAD')

List those refs among ref_pattern that contain given commit

This method can be used to check if a given commit is merged into at least one ref matching ref_pattern using 'git for-each-ref --contains', see https://git-scm.com/docs/git-for-each-ref

Return list of refs that contain given commit, or in other words list of refs that given commit is merged into.

Note that symbolic refs, such as 'HEAD', are expanded.

:param str commit: The commit to check if it is merged :param ref_pattern: …, that is a pattern or list of patterns; check each ref that match against at least one patterns, either using fnmatch(3) or literally, in the latter case matching completely, or from the beginning up to a slash. Defaults to 'HEAD'. :type ref_pattern: str or list[str] :return: list of refs matching ref_pattern that commit is merged into (that contain given commit) :rtype: list[str]

Source code in src/diffannotator/utils/git.py
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
def check_merged_into(self, commit: str, ref_pattern: Union[str, list[str]] = 'HEAD') -> list[str]:
    """List those refs among `ref_pattern` that contain given `commit`

    This method can be used to check if a given `commit` is merged into
    at least one ref matching `ref_pattern` using 'git for-each-ref --contains',
    see https://git-scm.com/docs/git-for-each-ref

    Return list of refs that contain given commit, or in other words
    list of refs that given commit is merged into.

    Note that symbolic refs, such as 'HEAD', are expanded.

    :param str commit: The commit to check if it is merged
    :param ref_pattern: <pattern>…, that is a pattern or list of patterns;
        check each ref that match against at least one patterns, either using
        fnmatch(3) or literally, in the latter case matching completely,
        or from the beginning up to a slash.  Defaults to 'HEAD'.
    :type ref_pattern: str or list[str]
    :return: list of refs matching `ref_pattern` that `commit` is merged into
        (that contain given `commit`)
    :rtype: list[str]
    """
    ref_pattern = self._to_refs_list(ref_pattern)

    cmd = [
        'git', '-C', self.repo,
        'for-each-ref', f'--contains={commit}',  # only list refs which contain the specified commit
        '--format=%(refname)',  # we only need list of refs that fulfill the condition mentioned above
        *ref_pattern
    ]
    process = subprocess.run(cmd,
                             capture_output=True, check=True,
                             # branch and other refs names cannot contain '\r' (CR),
                             # see https://git-scm.com/docs/git-check-ref-format
                             text=True, errors=self.encoding_errors)
    return process.stdout.splitlines()

clone_repository(repository, directory=None, working_dir=None, reference_local_repository=None, dissociate=False, make_path_absolute=False) classmethod

Clone a repository into a new directory, return cloned GitRepo

If there is non-empty directory preventing from cloning the repository, the method assumes that it is because the repository was already cloned; in this case it returns that directory as GitRepo.

:param repository: The (possibly remote) repository to clone from, usually a URL (ssh, git, http, or https) or a local path. :param directory: The name of a new directory to clone into, optional. The "human-ish" part of the source repository is used if directory is not provided (if it is None). :param working_dir: The directory where to run the git-clone https://git-scm.com/docs/git-clone operation; otherwise current working directory is used. The value of this parameter does not matter if directory is provided, :param reference_local_repository: Use reference_local_repository to avoid network transfer, and to reduce local storage costs :param dissociate: whether to dissociate with reference_local_repository, used only if reference_local_repository is not None :param make_path_absolute: Ensure that returned GitRepo uses absolute path :return: Cloned repository as GitRepo if operation was successful, otherwise None.

Source code in src/diffannotator/utils/git.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
@classmethod
def clone_repository(cls,
                     repository: PathLike,
                     directory: Optional[PathLike] = None,
                     working_dir: Optional[PathLike] = None,
                     reference_local_repository: Optional[PathLike] = None,
                     dissociate: bool = False,
                     make_path_absolute: bool = False) -> Union['GitRepo', None]:
    """Clone a repository into a new directory, return cloned GitRepo

    If there is non-empty directory preventing from cloning the repository,
    the method assumes that it is because the repository was already cloned;
    in this case it returns that directory as `GitRepo`.

    :param repository: The (possibly remote) repository to clone from,
        usually a URL (ssh, git, http, or https) or a local path.
    :param directory: The name of a new directory to clone into, optional.
        The "human-ish" part of the source repository is used if `directory`
        is not provided (if it is `None`).
    :param working_dir: The directory where to run the
        `git-clone https://git-scm.com/docs/git-clone` operation;
        otherwise current working directory is used.  The value
        of this parameter does not matter if `directory` is provided,
    :param reference_local_repository: Use `reference_local_repository`
        to avoid network transfer, and to reduce local storage costs
    :param dissociate: whether to dissociate with `reference_local_repository`,
        used only if `reference_local_repository` is not None
    :param make_path_absolute: Ensure that returned `GitRepo` uses absolute path
    :return: Cloned repository as `GitRepo` if operation was successful,
        otherwise `None`.
    """
    # TODO: make it @classmethod (to be able to use in constructor)
    def _to_repo_path(a_path: str) -> PathLike:
        if make_path_absolute:
            if Path(a_path).is_absolute():
                return a_path
            else:
                return Path(working_dir or '').joinpath(a_path).absolute()

        return a_path

    args = ['git']
    if working_dir is not None:
        args.extend(['-C', str(working_dir)])
    if reference_local_repository:
        args.extend([
            'clone', f'--reference-if-able={reference_local_repository}'
        ])
        if dissociate:
            args.append('--dissociate')
        args.append(repository)
    else:
        args.extend([
            'clone', repository
        ])
    if directory is not None:
        args.append(str(directory))

    # https://serverfault.com/questions/544156/git-clone-fail-instead-of-prompting-for-credentials
    env = {
        'GIT_TERMINAL_PROMPT': '0',
        'GIT_SSH_COMMAND': 'ssh -oBatchMode=yes',
        'GIT_ASKPASS': 'echo',
        'SSH_ASKPASS': 'echo',
        'GCM_INTERACTIVE': 'never',
    }

    result = subprocess.run(args, capture_output=True, env=env)
    if result.returncode == 128:
        # TODO: log a warning about the problem
        #print(f"{result.stderr=}")
        # try again without environment variables, in case of firewall problem like
        # fatal: unable to access 'https://github.com/githubtraining/hellogitworld.git/':
        #        getaddrinfo() thread failed to start
        result = subprocess.run(args, capture_output=True)

    # we are interested only in the directory where the repository was cloned into
    # that's why we are using GitRepo.path_encoding (instead of 'utf8', for example)

    if result.returncode == 128:
        # repository was already cloned
        for line in result.stderr.decode(GitRepo.path_encoding).splitlines():
            match = re.match(r"fatal: destination path '(.*)' already exists and is not an empty directory.", line)
            if match:
                return GitRepo(_to_repo_path(match.group(1)))

        # could not find where repository is
        return None

    elif result.returncode != 0:
        # other error
        return None

    for line in result.stderr.decode(GitRepo.path_encoding).splitlines():
        match = re.match(r"Cloning into '(.*)'...", line)
        if match:
            return GitRepo(_to_repo_path(match.group(1)))

    return None

count_commits(start_from=StartLogFrom.CURRENT, until_commit=None, first_parent=False)

Count number of commits in the repository

Starting from start_from, count number of commits, stopping at until_commit if provided.

If first_parent is set to True, makes Git follow only the first parent commit upon seeing a merge commit.

:param start_from: where to start from to follow 'parent' links :type start_from: str or StartLogFrom :param until_commit: where to stop following 'parent' links; also ensures that we follow ancestry path to it, optional :type until_commit: str or None :param bool first_parent: follow only the first parent commit upon seeing a merge commit :return: number of commits :rtype: int

Source code in src/diffannotator/utils/git.py
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
def count_commits(self,
                  start_from: str = StartLogFrom.CURRENT,
                  until_commit: str = None,
                  first_parent: bool = False) -> int:
    """Count number of commits in the repository

    Starting from `start_from`, count number of commits, stopping
    at `until_commit` if provided.

    If `first_parent` is set to True, makes Git follow only the first
    parent commit upon seeing a merge commit.

    :param start_from: where to start from to follow 'parent' links
    :type start_from: str or StartLogFrom
    :param until_commit: where to stop following 'parent' links;
        also ensures that we follow ancestry path to it, optional
    :type until_commit: str or None
    :param bool first_parent: follow only the first parent commit
        upon seeing a merge commit
    :return: number of commits
    :rtype: int
    """
    if hasattr(start_from, 'value'):
        start_from = start_from.value
    cmd = [
        'git', '-C', self.repo,
        'rev-list', '--count', str(start_from),
    ]
    if until_commit is not None:
        cmd.extend(['--not', until_commit, f'--ancestry-path={until_commit}', '--boundary'])
    if first_parent:
        cmd.append('--first-parent')
    process = subprocess.run(cmd,
                             capture_output=True, check=True,
                             # `git rev-list --count <start>` returns a number, no '\r' possible
                             encoding='utf-8', errors=self.encoding_errors)

    return int(process.stdout)

create_tag(tag_name, commit='HEAD')

Create lightweight tag (refs/tags/* ref) to the given commit

NOTE: does not support annotated tags for now; among others it would require deciding on tagger identity (at least for some backends).

:param str tag_name: Name of tag to be created. Should follow git check-ref-format rules for name; see https://git-scm.com/docs/git-check-ref-format ; for example they cannot contain space ' ', tilde '~', caret '^', or colon ':'. Those rules are NOT checked. :param str commit: Revision to be tagged. Defaults to 'HEAD'. :rtype: None

Source code in src/diffannotator/utils/git.py
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
def create_tag(self, tag_name: str, commit: str = 'HEAD') -> None:
    """Create lightweight tag (refs/tags/* ref) to the given commit

    NOTE: does not support annotated tags for now; among others it
    would require deciding on tagger identity (at least for some
    backends).

    :param str tag_name: Name of tag to be created.
        Should follow `git check-ref-format` rules for name;
        see https://git-scm.com/docs/git-check-ref-format ;
        for example they cannot contain space ' ', tilde '~', caret '^',
        or colon ':'.  Those rules are NOT checked.
    :param str commit: Revision to be tagged.  Defaults to 'HEAD'.
    :rtype: None
    """
    cmd = [
        'git', '-C', self.repo, 'tag', tag_name, commit,
    ]
    # we are interested in effects of the command, not its output
    subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

diff_file_status(commit='HEAD', prev=None)

Retrieve status of file changes at given revision in repo

It returns in a structured way information equivalent to the one from calling 'git diff --name-status -r'.

Example output

{ (None, 'added_file'): 'A', ('file_to_be_deleted', None): 'D', ('mode_changed', 'mode_changed'): 'M', ('modified', 'modified'): 'M', ('to_be_renamed', 'renamed'): 'R' }

:param commit: The commit for which to list changes for. Defaults to 'HEAD', that is the current commit. :type: str :param prev: The commit for which to list changes from. If not set, then changes are relative to the parent of the commit parameter, which means 'commit^'. :type: str or None :return: Information about the status of each change. Returns a mapping (a dictionary), where the key is the pair (tuple) of pre-image and post-image pathname, and the value is a single letter denoting the status / type of the change.

For new (added) files the pre-image path is `None`, and for deleted
files the post-image path is `None`.

Possible status letters are:
 - 'A': addition of a file,
 - 'C': copy of a file into a new one (not for all implementations),
 - 'D': deletion of a file,
 - 'M': modification of the contents or mode of a file,
 - 'R': renaming of a file,
 - 'T': change in the type of the file (untested).

:rtype: dict[tuple[str,str],str]

Source code in src/diffannotator/utils/git.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
def diff_file_status(self, commit: str = 'HEAD',
                     prev: Optional[str] = None) -> dict[tuple[str, str], str]:
    """Retrieve status of file changes at given revision in repo

    It returns in a structured way information equivalent to the one
    from calling 'git diff --name-status -r'.

    Example output:
        {
            (None, 'added_file'): 'A',
            ('file_to_be_deleted', None): 'D',
            ('mode_changed', 'mode_changed'): 'M',
            ('modified', 'modified'): 'M',
            ('to_be_renamed', 'renamed'): 'R'
        }

    :param commit: The commit for which to list changes for.
        Defaults to 'HEAD', that is the current commit.
    :type: str
    :param prev: The commit for which to list changes from.
        If not set, then changes are relative to the parent of
        the `commit` parameter, which means 'commit^'.
    :type: str or None
    :return: Information about the status of each change.
        Returns a mapping (a dictionary), where the key is the pair (tuple)
        of pre-image and post-image pathname, and the value is a
        single letter denoting the status / type of the change.

        For new (added) files the pre-image path is `None`, and for deleted
        files the post-image path is `None`.

        Possible status letters are:
         - 'A': addition of a file,
         - 'C': copy of a file into a new one (not for all implementations),
         - 'D': deletion of a file,
         - 'M': modification of the contents or mode of a file,
         - 'R': renaming of a file,
         - 'T': change in the type of the file (untested).

    :rtype: dict[tuple[str,str],str]
    """
    if prev is None:
        # NOTE: this means first-parent changes for merge commits
        prev = commit + '^'

    cmd = [
        'git', '-C', self.repo, 'diff-tree', '--no-commit-id',
        # turn on renames [with '-M' or '-C'];
        # note that parsing is a bit easier without '-z', assuming that filenames are sane
        # increase inexact rename detection limit
        '--find-renames', '-l5000', '--name-status', '-r',
        prev, commit
    ]
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    lines = process.stdout.read().decode(GitRepo.path_encoding).splitlines()
    result = {}
    for line in lines:
        if line[0] == 'R' or line[0] == 'C':
            status, old, new = line.split("\t")
            result[(old, new)] = status[0]  # no similarity info
        else:
            status, path = line.split("\t")
            if status == 'A':
                result[(None, path)] = status
            elif status == 'D':
                result[(path, None)] = status
            else:
                result[(path, path)] = status

    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    return result

file_contents(commit, path, encoding=None)

Retrieve contents of given file at given revision / tree

:param str commit: The commit for which to return file contents. :param str path: Path to a file, relative to the top-level of the repository :param encoding: Encoding of the file (optional) :type: str or None :return: Contents of the file with given path at given revision :rtype: str

Source code in src/diffannotator/utils/git.py
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
def file_contents(self, commit: str, path: str, encoding: Optional[str] = None) -> str:
    """Retrieve contents of given file at given revision / tree

    :param str commit: The commit for which to return file contents.
    :param str path: Path to a file, relative to the top-level of the repository
    :param encoding: Encoding of the file (optional)
    :type: str or None
    :return: Contents of the file with given path at given revision
    :rtype: str
    """
    if encoding is None:
        encoding = GitRepo.default_file_encoding

    process = self._file_contents_process(commit, path)
    result = process.stdout.read().decode(encoding=encoding, errors=self.encoding_errors)
    # NOTE: does not handle errors correctly yet
    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    return result

find_commit_by_timestamp(timestamp, start_commit='HEAD')

Find first commit in repository older than given date

:param timestamp: Date in UNIX epoch format, also known as timestamp format. Returned commit would be older than this date. :type: int or str :param str start_commit: The commit from which to start walking through commits, trying to find the one we want. Defaults to 'HEAD' :return: Full SHA-1 identifier of found commit.

WARNING: there is currently no support for error handling,
among others for not finding any commit that fulfills
the condition.  At least it is not tested.

:rtype: str

Source code in src/diffannotator/utils/git.py
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
def find_commit_by_timestamp(self, timestamp: Union[str, int], start_commit: str = 'HEAD') -> str:
    """Find first commit in repository older than given date

    :param timestamp: Date in UNIX epoch format, also known as timestamp format.
        Returned commit would be older than this date.
    :type: int or str
    :param str start_commit: The commit from which to start walking through commits,
        trying to find the one we want.  Defaults to 'HEAD'
    :return: Full SHA-1 identifier of found commit.

        WARNING: there is currently no support for error handling,
        among others for not finding any commit that fulfills
        the condition.  At least it is not tested.

    :rtype: str
    """
    cmd = [
        'git', '-C', self.repo, 'rev-list',
        f'--min-age={timestamp}', '-1',
        start_commit
    ]
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    # this should be US-ASCII hexadecimal identifier
    result = process.stdout.read().decode('latin1').strip()
    # NOTE: does not handle errors correctly yet

    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    return result

find_roots(start_from=StartLogFrom.CURRENT)

Find root commits (commits without parents), starting from start_from

:param start_from: where to start from to follow 'parent' links :type start_from: str or StartLogFrom :return: list of root commits, as SHA-1 :rtype: list[str]

Source code in src/diffannotator/utils/git.py
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
def find_roots(self, start_from: str = StartLogFrom.CURRENT) -> list[str]:
    """Find root commits (commits without parents), starting from `start_from`

    :param start_from: where to start from to follow 'parent' links
    :type start_from: str or StartLogFrom
    :return: list of root commits, as SHA-1
    :rtype: list[str]
    """
    if hasattr(start_from, 'value'):
        start_from = start_from.value
    elif start_from is None:
        start_from = 'HEAD'

    cmd = [
        'git', '-C', self.repo,
        'rev-list', '--max-parents=0',  # gives all root commits
        str(start_from),
    ]
    process = subprocess.run(cmd,
                             capture_output=True, check=True,
                             # the Git command above returns list of commit identifiers
                             # separated by newlines, therefore no '\r' in output possible
                             text=True, errors=self.encoding_errors)
    return process.stdout.splitlines()

format_patch(output_dir=None, revision_range=('-1', 'HEAD'))

Generate patches out of specified revisions, saving them as individual files

:param output_dir: output directory for patches; if not set (the default), save patches in the current working directory :param revision_range: arguments to pass to git format-patch, see https://git-scm.com/docs/git-format-patch; by default generates single patch from the HEAD :return: output from the git format-patch process

Source code in src/diffannotator/utils/git.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def format_patch(self,
                 output_dir: Optional[PathLike] = None,
                 revision_range: Union[str, Iterable[str]] = ('-1', 'HEAD')) -> str:
    """Generate patches out of specified revisions, saving them as individual files

    :param output_dir: output directory for patches; if not set (the default),
        save patches in the current working directory
    :param revision_range: arguments to pass to `git format-patch`, see
        https://git-scm.com/docs/git-format-patch; by default generates single patch
        from the HEAD
    :return: output from the `git format-patch` process
    """
    # NOTE: it should be ':param \*args' or ':param \\*args', but for the bug in PyCharm
    cmd = [
        'git', '-C', str(self.repo),
        'format-patch'
    ]
    if output_dir is not None:
        cmd.extend([
            '--output-directory', str(output_dir)
        ])
    if isinstance(revision_range, str):
        cmd.append(revision_range)
    else:
        cmd.extend(revision_range)

    # NOTE: specifying `encoding` or `errors` turns on `text` == `universal_newlines`
    # and you cannot say `text=False` and/or `universal_newlines=False` to turn it off
    # The output of the `git format-patch` command can contain embedded `\r` (CR)
    process = subprocess.run(cmd,
                             capture_output=True, check=True)
    # MAYBE: better checks for process.returncode, and examine process.stderr
    if process.returncode == 0:
        return process.stdout.decode(encoding='utf-8', errors=self.encoding_errors)
    else:
        return process.stderr.decode(encoding='utf-8', errors=self.encoding_errors)

get_commit_metadata(commit='HEAD')

Retrieve metadata about given commit

    :param str commit: The commit to examine.
        Defaults to 'HEAD', that is the current (most recent) commit.
    :return: Information about selected parts of commit metadata,
        the following format:

        {
            'id': 'f8ffd4067d1f1b902ae06c52db4867f57a424f38',
            'parents': ['fe4a622e5202cd990c8ec853d56e25922f263243'],
            'tree': '5347fe7b8606e7a164ab5cd355ee5d86c99796c0'
            'author': {
                'author': 'A U Thor <author@example.com>',
                'name': 'A U Thor',
                'email': 'author@example.com',
                'timestamp': 1112912053,
                'tz_info': '-0600',
            },
            'committer': {
                'committer': 'C O Mitter <committer@example.com>'
                'name': 'C O Mitter',
                'email': 'committer@example.com',
                'timestamp': 1693598847,
                'tz_info': '+0200',
            },
            'message': 'Commit summary

Optional longer description ', }

        TODO: use dataclass for result (for computed fields)

    :rtype: dict
Source code in src/diffannotator/utils/git.py
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
def get_commit_metadata(self, commit: str = 'HEAD') -> dict[str, Union[str, dict, list]]:
    """Retrieve metadata about given commit

    :param str commit: The commit to examine.
        Defaults to 'HEAD', that is the current (most recent) commit.
    :return: Information about selected parts of commit metadata,
        the following format:

        {
            'id': 'f8ffd4067d1f1b902ae06c52db4867f57a424f38',
            'parents': ['fe4a622e5202cd990c8ec853d56e25922f263243'],
            'tree': '5347fe7b8606e7a164ab5cd355ee5d86c99796c0'
            'author': {
                'author': 'A U Thor <author@example.com>',
                'name': 'A U Thor',
                'email': 'author@example.com',
                'timestamp': 1112912053,
                'tz_info': '-0600',
            },
            'committer': {
                'committer': 'C O Mitter <committer@example.com>'
                'name': 'C O Mitter',
                'email': 'committer@example.com',
                'timestamp': 1693598847,
                'tz_info': '+0200',
            },
            'message': 'Commit summary\n\nOptional longer description\n',
        }

        TODO: use dataclass for result (for computed fields)

    :rtype: dict
    """
    # NOTE: using low level git 'plumbing' command means 'utf8' encoding is not assured
    # same as in `parse_commit` in gitweb/gitweb.perl in https://github.com/git/git
    # https://github.com/git/git/blob/3525f1dbc18ae36ca9c671e807d6aac2ac432600/gitweb/gitweb.perl#L3591C5-L3591C17
    cmd = [
        'git', '-C', self.repo, 'rev-list',
        '--parents', '--header', '--max-count=1', commit,
        '--'
    ]
    process = subprocess.run(cmd, capture_output=True, check=True)
    return _parse_commit_text(
        process.stdout.decode(GitRepo.log_encoding, errors=self.encoding_errors),
        # next parameters depend on the git command used
        with_parents_line=True, indented_body=True
    )

get_config(name, value_type=None)

Query specific git config option

If there is no Git configuration variable named name, then it returns None.

:param str name: name of configuration option, for example 'remote.origin.url' or 'user.name' :param value_type: name of git type to canonicalize outgoing value, see https://git-scm.com/docs/git-config#Documentation/git-config.txt---typelttypegt optional :type value_type: Literal['bool', 'int', 'bool-or-int', 'path', 'expiry-date', 'color'] or None :return: value of requested git configuration variable :rtype: str or None

Source code in src/diffannotator/utils/git.py
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
def get_config(self, name: str, value_type: Optional[str] = None) -> Union[str, None]:
    """Query specific git config option

    If there is no Git configuration variable named `name`,
    then it returns None.

    :param str name: name of configuration option, for example
        'remote.origin.url' or 'user.name'
    :param value_type: name of git type to canonicalize outgoing value,
        see https://git-scm.com/docs/git-config#Documentation/git-config.txt---typelttypegt
        optional
    :type value_type: Literal['bool', 'int', 'bool-or-int', 'path', 'expiry-date', 'color'] or None
    :return: value of requested git configuration variable
    :rtype: str or None
    """
    cmd = [
        'git', '-C', self.repo,
        'config', str(name)
    ]
    if value_type is not None:
        cmd.append(f"--type={value_type}")

    try:
        process = subprocess.run(cmd,
                                 capture_output=True, check=True)
        return process.stdout.decode(errors=self.encoding_errors).strip()

    except subprocess.CalledProcessError as err:
        # This command will fail with non-zero status upon error. Some exit codes are:
        # - The section or key is invalid (ret=1),
        # - ...
        if err.returncode == 1:
            return None
        else:
            raise err

get_current_branch()

Return short name of the current branch

It returns name of the branch, e.g. "main", rather than fully qualified name (full name), e.g. "refs/heads/main".

Will return None if there is no current branch, that is if repo is in the 'detached HEAD' state.

:return: name of the current branch :rtype: str or None

Source code in src/diffannotator/utils/git.py
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
def get_current_branch(self) -> Union[str, None]:
    """Return short name of the current branch

    It returns name of the branch, e.g. "main", rather than fully
    qualified name (full name), e.g. "refs/heads/main".

    Will return None if there is no current branch, that is if
    repo is in the 'detached HEAD' state.

    :return: name of the current branch
    :rtype: str or None
    """
    cmd = [
        'git', '-C', self.repo,
        'symbolic-ref', '--quiet', '--short', 'HEAD'
    ]
    try:
        # Using '--quiet' means that the command would not issue an error message
        # but exit with non-zero status silently if HEAD is not a symbolic ref, but detached HEAD
        process = subprocess.run(cmd,
                                 capture_output=True, check=True,
                                 # branch names cannot contain '\r' (CR) character,
                                 # see https://git-scm.com/docs/git-check-ref-format
                                 text=True, errors=self.encoding_errors)
    except subprocess.CalledProcessError:
        return None

    return process.stdout.strip()

is_valid_commit(commit)

Check if commit is present in the repository as a commit

:param str commit: reference to a commit, for example "HEAD" or "main", or "fc6db4e600d633d6fc206217e70641bbb78cbc53^" :return: whether commit is a valid commit in repo :rtype: bool

Source code in src/diffannotator/utils/git.py
1195
1196
1197
1198
1199
1200
1201
1202
1203
def is_valid_commit(self, commit: str) -> bool:
    """Check if `commit` is present in the repository as a commit

    :param str commit: reference to a commit, for example "HEAD" or "main",
        or "fc6db4e600d633d6fc206217e70641bbb78cbc53^"
    :return: whether `commit` is a valid commit in repo
    :rtype: bool
    """
    return self.to_oid(str(commit) + '^{commit}') is not None

list_authors_shortlog(start_from=StartLogFrom.ALL)

List all authors using git-shortlog

Summarizes the history of the project by providing list of authors together with their commit counts. Uses git shortlog --summary internally.

:param start_from: where to start from to follow 'parent' links :type start_from: str or StartLogFrom :return: list of authors together with their commit count, in the 'SPACE* TAB ' format :rtype: list[str|bytes]

Source code in src/diffannotator/utils/git.py
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
def list_authors_shortlog(self, start_from: str = StartLogFrom.ALL) -> list[Union[str, bytes]]:
    """List all authors using git-shortlog

    Summarizes the history of the project by providing list of authors
    together with their commit counts.  Uses `git shortlog --summary`
    internally.

    :param start_from: where to start from to follow 'parent' links
    :type start_from: str or StartLogFrom
    :return: list of authors together with their commit count,
        in the 'SPACE* <count> TAB <author>' format
    :rtype: list[str|bytes]
    """
    if hasattr(start_from, 'value'):
        start_from = start_from.value
    elif start_from is None:
        start_from = '--all'
    cmd = [
        'git', '-C', self.repo,
        'shortlog',
        '--summary',  # Suppress commit description and provide a commit count summary only.
        '-n',  # Sort output according to the number of commits per author
        start_from,
    ]
    process = subprocess.run(cmd, capture_output=True, check=True)
    try:
        # try to return text
        return process.stdout.decode(GitRepo.log_encoding, errors=self.encoding_errors).splitlines()
    except UnicodeDecodeError:
        # if not possible, return bytes
        return process.stdout.splitlines()

list_changed_files(commit='HEAD', side=DiffSide.POST)

Retrieve list of files changed at given revision in repo

NOTE: not tested for merge commits, especially "evil merges" with respect to file names.

:param str commit: The commit for which to list changes. Defaults to 'HEAD', that is the current commit. The changes are relative to commit^, that is the previous commit (first parent of the given commit).

:param DiffSide side: Whether to use names of files in post-image (after changes) with side=DiffSide.POST, or pre-image names (before changes) with side=DiffSide.PRE. Renames are detected by Git.

:return: full path names of files changed in commit. :rtype: list[str]

Source code in src/diffannotator/utils/git.py
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def list_changed_files(self, commit: str = 'HEAD',
                       side: DiffSide = DiffSide.POST) -> list[str]:
    """Retrieve list of files changed at given revision in repo

    NOTE: not tested for merge commits, especially "evil merges"
    with respect to file names.

    :param str commit:
        The commit for which to list changes.  Defaults to 'HEAD',
        that is the current commit.  The changes are relative to
        commit^, that is the previous commit (first parent of the
        given commit).

    :param DiffSide side:
        Whether to use names of files in post-image (after changes)
        with side=DiffSide.POST, or pre-image names (before changes)
        with side=DiffSide.PRE.  Renames are detected by Git.

    :return: full path names of files changed in `commit`.
    :rtype: list[str]
    """
    if side == DiffSide.PRE:
        changes_status = self.diff_file_status(commit)
        return [
            pre for (pre, _) in changes_status.keys()
            if pre is not None  # TODO: check how deleted files work with side=DiffSide.POST
        ]

    if side != DiffSide.POST:
        raise NotImplementedError(f"GitRepo.list_changed_files: unsupported side={side} parameter")

    # --no-commit-id is needed for 1-argument git-diff-tree
    cmd = [
        'git', '-C', self.repo, 'diff-tree', '-M',
        '-r', '--name-only', '--no-commit-id', '-z',
        commit
    ]
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    result = process.stdout.read() \
                 .decode(GitRepo.path_encoding) \
                 .split('\0')[:-1]
    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    return result

list_files(commit='HEAD')

Retrieve list of files at given revision in a repository

:param str commit: The commit for which to list all files. Defaults to 'HEAD', that is the current commit :return: List of full path names of all files in the repository. :rtype: list[str]

Source code in src/diffannotator/utils/git.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def list_files(self, commit: str = 'HEAD') -> list[str]:
    """Retrieve list of files at given revision in a repository

    :param str commit:
        The commit for which to list all files.  Defaults to 'HEAD',
        that is the current commit
    :return: List of full path names of all files in the repository.
    :rtype: list[str]
    """
    args = [
        'git', '-C', str(self.repo), 'ls-tree',
        '-r', '--name-only', '--full-tree', '-z',
        commit
    ]
    # TODO: consider replacing with subprocess.run()
    process = subprocess.Popen(args, stdout=subprocess.PIPE)
    result = process.stdout.read() \
                 .decode(GitRepo.path_encoding) \
                 .split('\0')[:-1]
    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running
    # TODO: add error checking
    return result

list_tags()

Retrieve list of all tags in the repository

:return: List of all tags in the repository. :rtype: list[str]

Source code in src/diffannotator/utils/git.py
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
def list_tags(self) -> list[str]:
    """Retrieve list of all tags in the repository

    :return: List of all tags in the repository.
    :rtype: list[str]
    """
    cmd = ['git', '-C', self.repo, 'tag', '--list']
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    # NOTE: f.readlines() might be not the best solution
    tags = [line.decode(GitRepo.path_encoding).rstrip()
            for line in process.stdout.readlines()]

    process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
    process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

    return tags

log_p(revision_range=('-1', 'HEAD'), wrap=True)

log_p(
    revision_range: Union[str, Iterable[str]] = ...,
    wrap: Literal[True] = ...,
) -> Iterator[ChangeSet]
log_p(
    revision_range: Union[str, Iterable[str]] = ...,
    wrap: Literal[False] = ...,
) -> Iterator[str]
log_p(
    revision_range: Union[str, Iterable[str]] = ...,
    wrap: bool = ...,
) -> Union[Iterator[str], Iterator[ChangeSet]]

Generate commits with unified diffs for a given revision_range

If revision_range is not provided, it generates single most recent commit on the current branch.

The wrap parameter controls the output format. If true (the default), generate series of unidiff.PatchSet for commits changes. If false, generate series of raw commit + unified diff of commit changes (as str). This is similar to how unidiff() method works.

:param revision_range: arguments to pass to git log --patch, see https://git-scm.com/docs/git-log; by default generates single patch from the HEAD :param wrap: whether to wrap the result in PatchSet :return: the changes for given revision_range

Source code in src/diffannotator/utils/git.py
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def log_p(self, revision_range=('-1', 'HEAD'), wrap=True):
    """Generate commits with unified diffs for a given `revision_range`

    If `revision_range` is not provided, it generates single most recent
    commit on the current branch.

    The `wrap` parameter controls the output format.  If true (the
    default), generate series of `unidiff.PatchSet` for commits changes.
    If false, generate series of raw commit + unified diff of commit
    changes (as `str`).  This is similar to how `unidiff()` method works.

    :param revision_range: arguments to pass to `git log --patch`, see
        https://git-scm.com/docs/git-log; by default generates single patch
        from the HEAD
    :param wrap: whether to wrap the result in PatchSet
    :return: the changes for given `revision_range`
    """
    def commit_with_patch(_commit_id: str, _commit_data: StringIO) -> ChangeSet:
        """Helper to create ChangeSet with from _commit_data stream"""
        _commit_data.seek(0)  # rewind to beginning for reading by the PatchSet constructor
        return ChangeSet(_commit_data, _commit_id)

    cmd = [
        'git', '-C', str(self.repo),
        # NOTE: `git rev-list` does not support --patch option
        'log', '--format=raw', '--diff-merges=first-parent', '--patch', '-z',  # log options
        '--find-renames', '--find-copies', '--find-copies-harder',  # diff options
    ]
    if isinstance(revision_range, str):
        cmd.append(revision_range)
    else:
        cmd.extend(revision_range)

    ## DEBUG (TODO: switch to logger.debug())
    #print(f"{cmd=}")

    # NOTE: specifying `encoding` or `errors` turns on `text` == `universal_newlines`
    # and you cannot say `text=False` and/or `universal_newlines=False` to turn it off
    # The output of the `git log -p` command can contain embedded `\r` (CR)
    process = subprocess.Popen(
        cmd,
        #bufsize=1,  # line buffered
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,  # TODO: consider capturing stderr
    )

    commit_data = StringIO()
    commit_id: Optional[str] = None
    # if we are waiting on the process, readline can return empty line
    # if the process ends, we can still have lines in buffer
    while (log_p_line_raw := process.stdout.readline()) or process.poll() is None:
        log_p_line = log_p_line_raw.decode(
            encoding='utf-8',
            errors=self.encoding_errors,
        )
        if log_p_line:
            if not commit_id and log_p_line[0] != '\0':
                # first line in output
                commit_id = log_p_line.strip()[7:]  # strip "commit "

            if log_p_line[0] == '\0':
                # end of old commit, start of new commit
                ## DEBUG (TODO: switch to logger.debug())
                #print(f"new commit: {log_p_line[1:]}", end="")
                # return old commit data
                if wrap:
                    yield commit_with_patch(commit_id, commit_data)
                else:
                    yield commit_data.getvalue()
                # start gathering data for a new commit
                commit_data.truncate(0)
                # strip the '\0' separator
                log_p_line = log_p_line[1:]
                commit_id = log_p_line.strip()[7:]  # strip "commit "

            # gather next line of commit data
            commit_data.write(log_p_line)

    if commit_data.tell() > 0:
        # there is gathered data from the last commit
        ## DEBUG (TODO: switch to logger.debug())
        #print("last commit")
        if wrap:
            yield commit_with_patch(commit_id, commit_data)
        else:
            yield commit_data.getvalue()

    return_code = process.wait()
    if return_code != 0:
        print(f"Error running 'git log' for {self.repo.name} repo, error code = {return_code}")
        print(f"- repository path: '{self.repo}'")

open_file(commit, path)

Open given file at given revision / tree as binary file

Works as a context manager, like pathlib.Path.open(): >>> with GitRepo('/path/to/repo').open_file('v1', 'example_file') as fpb: ... contents = fpb.read().decode('utf8') ...

:param str commit: The commit for which to return file contents. :param str path: Path to a file, relative to the top-level of the repository :return: file object, opened in binary mode :rtype: io.BufferedReader

Source code in src/diffannotator/utils/git.py
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
@contextmanager
def open_file(self, commit: str, path: str) -> BufferedReader:
    """Open given file at given revision / tree as binary file

    Works as a context manager, like `pathlib.Path.open()`:
        >>> with GitRepo('/path/to/repo').open_file('v1', 'example_file') as fpb:
        ...     contents = fpb.read().decode('utf8')
        ...

    :param str commit: The commit for which to return file contents.
    :param str path: Path to a file, relative to the top-level of the repository
    :return: file object, opened in binary mode
    :rtype: io.BufferedReader
    """
    process = self._file_contents_process(commit, path)
    try:
        yield process.stdout
    finally:
        # NOTE: does not handle errors correctly yet
        process.stdout.close()  # to avoid ResourceWarning: unclosed file <_io.BufferedReader name=3>
        process.wait()  # to avoid ResourceWarning: subprocess NNN is still running

resolve_symbolic_ref(ref='HEAD')

Return full name of reference ref symbolic ref points to

If ref is not symbolic reference (e.g. ref='HEAD' and detached HEAD state) it returns None.

:param str ref: name of the symbolic reference, e.g. "HEAD" :return: resolved ref :rtype: str or None

Source code in src/diffannotator/utils/git.py
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
def resolve_symbolic_ref(self, ref: str = 'HEAD') -> Union[str, None]:
    """Return full name of reference `ref` symbolic ref points to

    If `ref` is not symbolic reference (e.g. ref='HEAD' and detached
    HEAD state) it returns None.

    :param str ref: name of the symbolic reference, e.g. "HEAD"
    :return: resolved `ref`
    :rtype: str or None
    """
    cmd = [
        'git', '-C', self.repo,
        'symbolic-ref', '--quiet', str(ref)
    ]
    try:
        # Using '--quiet' means that the command would not issue an error message
        # but exit with non-zero status silently if `ref` is not a symbolic ref
        process = subprocess.run(cmd,
                                 capture_output=True, check=True,
                                 # branch names and symbolic refereces cannot contain '\r' (CR),
                                 # see https://git-scm.com/docs/git-check-ref-format
                                 text=True, errors=self.encoding_errors)
    except subprocess.CalledProcessError:
        return None

    return process.stdout.strip()

to_oid(obj)

Convert object reference to object identifier

Returns None if object obj is not present in the repository

:param str obj: object reference, for example "HEAD" or "main^", see e.g. https://git-scm.com/docs/gitrevisions :return: SHA-1 identifier of object, or None if object is not found :rtype: str or None

Source code in src/diffannotator/utils/git.py
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
def to_oid(self, obj: str) -> Union[str, None]:
    """Convert object reference to object identifier

    Returns None if object `obj` is not present in the repository

    :param str obj: object reference, for example "HEAD" or "main^",
        see e.g. https://git-scm.com/docs/gitrevisions
    :return: SHA-1 identifier of object, or None if object is not found
    :rtype: str or None
    """
    cmd = [
        'git', '-C', self.repo,
        'rev-parse', '--verify', '--end-of-options', obj
    ]
    try:
        # emits SHA-1 identifier if object is found in the repo; otherwise, errors out
        process = subprocess.run(cmd, capture_output=True, check=True)
    except subprocess.CalledProcessError:
        return None

    # SHA-1 is ASCII only
    return process.stdout.decode('latin1').strip()

unidiff(commit='HEAD', prev=None, wrap=True)

unidiff(
    commit: str = ...,
    prev: Optional[str] = ...,
    wrap: Literal[True] = ...,
) -> ChangeSet
unidiff(
    commit: str = ...,
    prev: Optional[str] = ...,
    *,
    wrap: Literal[False]
) -> Union[str, bytes]
unidiff(
    commit: str = ...,
    prev: Optional[str] = ...,
    wrap: bool = ...,
) -> Union[str, bytes, ChangeSet]

Return unified diff between commit and prev

If prev is None (which is the default), return diff between the commit and its first parent, or between the commit and the empty tree if commit does not have any parents (if it is a root commit).

If wrap is True (which is the default), wrap the result in unidiff.PatchSet to make it easier to extract information from the diff. Otherwise, return diff as plain text.

:param str commit: later (second) of two commits to compare, defaults to 'HEAD', that is the current commit :param prev: earlier (first) of two commits to compare, defaults to None, which means comparing to parent of commit :type prev: str or None :param bool wrap: whether to wrap the result in PatchSet :return: the changes between two arbitrary commits, prev and commit :rtype: str or bytes or ChangeSet

Source code in src/diffannotator/utils/git.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def unidiff(self, commit='HEAD', prev=None, wrap=True):
    """Return unified diff between `commit` and `prev`

    If `prev` is None (which is the default), return diff between the
    `commit` and its first parent, or between the `commit` and the empty
    tree if `commit` does not have any parents (if it is a root commit).

    If `wrap` is True (which is the default), wrap the result in
    unidiff.PatchSet to make it easier to extract information from
    the diff.  Otherwise, return diff as plain text.

    :param str commit: later (second) of two commits to compare,
        defaults to 'HEAD', that is the current commit
    :param prev: earlier (first) of two commits to compare,
        defaults to None, which means comparing to parent of `commit`
    :type prev: str or None
    :param bool wrap: whether to wrap the result in PatchSet
    :return: the changes between two arbitrary commits,
        `prev` and `commit`
    :rtype: str or bytes or ChangeSet
    """
    if prev is None:
        try:
            # NOTE: this means first-parent changes for merge commits
            return self.unidiff(commit=commit, prev=commit + '^', wrap=wrap)
        except subprocess.CalledProcessError:
            # commit^ does not exist for a root commits (for first commits)
            return self.unidiff(commit=commit, prev=self.empty_tree_sha1, wrap=wrap)

    cmd = [
        'git', '-C', self.repo,
        'diff', '--find-renames', '--find-copies', '--find-copies-harder',
        prev, commit
    ]
    process = subprocess.run(cmd,
                             capture_output=True, check=True)
    try:
        diff_output = process.stdout.decode(self.default_file_encoding)
    except UnicodeDecodeError:
        # unidiff.PatchSet can only handle strings
        diff_output = process.stdout.decode(self.fallback_encoding)

    if wrap:
        return ChangeSet(diff_output, self.to_oid(commit),
                         prev=prev)
    else:
        return diff_output

StartLogFrom

Bases: Enum

Enum to be used for special cases for starting point of 'git log'

Source code in src/diffannotator/utils/git.py
54
55
56
57
58
class StartLogFrom(Enum):
    """Enum to be used for special cases for starting point of 'git log'"""
    CURRENT = 'HEAD'
    HEAD = 'HEAD'  # alias
    ALL = '--all'

decode_c_quoted_str(text)

C-style name unquoting

See unquote_c_style() function in 'quote.c' file in git/git source code https://github.com/git/git/blob/master/quote.c#L401

This is subset of escape sequences supported by C and C++ https://learn.microsoft.com/en-us/cpp/c-language/escape-sequences

:param str text: string which may be c-quoted :return: decoded string :rtype: str

Source code in src/diffannotator/utils/git.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def decode_c_quoted_str(text: str) -> str:
    """C-style name unquoting

    See unquote_c_style() function in 'quote.c' file in git/git source code
    https://github.com/git/git/blob/master/quote.c#L401

    This is subset of escape sequences supported by C and C++
    https://learn.microsoft.com/en-us/cpp/c-language/escape-sequences

    :param str text: string which may be c-quoted
    :return: decoded string
    :rtype: str
    """
    # TODO?: Make it a global variable
    escape_dict = {
        'a': '\a',  # Bell (alert)
        'b': '\b',  # Backspace
        'f': '\f',  # Form feed
        'n': '\n',  # New line
        'r': '\r',  # Carriage return
        't': '\t',  # Horizontal tab
        'v': '\v',  # Vertical tab
    }

    quoted = text.startswith('"') and text.endswith('"')
    if quoted:
        text = text[1:-1]  # remove quotes

        buf = bytearray()
        escaped = False  # TODO?: switch to state = 'NORMAL', 'ESCAPE', 'ESCAPE_OCTAL'
        oct_str = ''

        for ch in text:
            if not escaped:
                if ch != '\\':
                    buf.append(ord(ch))
                else:
                    escaped = True
                    oct_str = ''
            else:
                if ch in ('"', '\\'):
                    buf.append(ord(ch))
                    escaped = False
                elif ch in escape_dict:
                    buf.append(ord(escape_dict[ch]))
                    escaped = False
                elif '0' <= ch <= '7':  # octal values with first digit over 4 overflow
                    oct_str += ch
                    if len(oct_str) == 3:
                        byte = int(oct_str, base=8)  # byte in octal notation
                        if byte > 256:
                            raise ValueError(f'Invalid octal escape sequence \\{oct_str} in "{text}"')
                        buf.append(byte)
                        escaped = False
                        oct_str = ''
                else:
                    raise ValueError(f'Unexpected character \'{ch}\' in escape sequence when parsing "{text}"')

        if escaped:
            raise ValueError(f'Unfinished escape sequence when parsing "{text}"')

        text = buf.decode(errors=ENCODING_ERRORS)

    return text

parse_shortlog_count(shortlog_lines)

Parse the result of GitRepo.list_authors_shortlog() method

:param shortlog_lines: result of list_authors_shortlog() :type shortlog_lines: str or bytes :return: list of parsed statistics, number of commits per author :rtype: list[AuthorStat]

Source code in src/diffannotator/utils/git.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def parse_shortlog_count(shortlog_lines: list[Union[str, bytes]]) -> list[AuthorStat]:
    """Parse the result of GitRepo.list_authors_shortlog() method

    :param shortlog_lines: result of list_authors_shortlog()
    :type shortlog_lines: str or bytes
    :return: list of parsed statistics, number of commits per author
    :rtype: list[AuthorStat]
    """
    result = []
    for line in shortlog_lines:
        count, author = line.split('\t' if isinstance(line, str) else b'\t', maxsplit=1)
        count = int(count.strip())
        result.append(AuthorStat(author, count))

    return result