1
1
# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
2
+ # Copyright (C) 2022 OVH SAS
2
3
#
3
4
# This library is free software; you can redistribute it and/or
4
5
# modify it under the terms of the GNU Lesser General Public
21
22
from oio .common .utils import cid_from_name
22
23
from oio .common .constants import OIO_VERSION
23
24
from oio .common .fullpath import encode_fullpath
25
+ from oio .common .storage_method import ECDriverError
24
26
from oio .blob .rebuilder import BlobRebuilder
25
- from tests .utils import BaseTestCase , random_str
27
+ from tests .utils import BaseTestCase , random_data , random_str
26
28
from tests .functional .blob import convert_to_old_chunk
27
29
28
30
@@ -53,7 +55,8 @@ def setUp(self):
53
55
self .rawx_volumes [service_id ] = volume
54
56
55
57
self .api .object_create (
56
- self .account , self .container , obj_name = self .path , data = "chunk" )
58
+ self .account , self .container , obj_name = self .path ,
59
+ data = random_data (50 * 1024 ))
57
60
meta , self .chunks = self .api .object_locate (
58
61
self .account , self .container , self .path )
59
62
self .version = meta ['version' ]
@@ -66,6 +69,65 @@ def _chunk_path(self, chunk):
66
69
volume = self .rawx_volumes [volume_id ]
67
70
return volume + '/' + chunk_id [:3 ] + '/' + chunk_id
68
71
72
+ def _corrupt_chunk (self , chunk , offset = 7 ):
73
+ chunk_path = self ._chunk_path (chunk )
74
+ self .logger .debug ("Corrupting chunk %s" , chunk_path )
75
+ with open (chunk_path , 'rb+' ) as chunk_fd :
76
+ chunk_fd .seek (offset , os .SEEK_SET )
77
+ last_byte = chunk_fd .read (1 )
78
+ last_byte = chr ((ord (last_byte ) - 1 ) % 256 )
79
+ chunk_fd .seek (offset , os .SEEK_SET )
80
+ chunk_fd .write (last_byte )
81
+ chunk_fd .flush ()
82
+
83
+ def test_rebuild_with_corrupt_input (self ):
84
+ """
85
+ The the rebuild of a missing chunk while 2 other chunks are corrupt.
86
+ Notice that we corrupt the chunk's EC preamble, not the chunk's data
87
+ segment.
88
+ """
89
+ if self .conf ['storage_policy' ] != 'EC' \
90
+ or len (self .conf ['services' ]['rawx' ]) < 9 :
91
+ self .skipTest ("Will run only with 'EC' storage policy "
92
+ + "and at least 9 rawx services" )
93
+
94
+ # pick one chunk, remove it
95
+ removed_chunk = random .choice (self .chunks )
96
+ chunk_headers = self .blob_client .chunk_head (removed_chunk ['url' ])
97
+ removed_chunk_size = int (chunk_headers ['chunk_size' ])
98
+ os .remove (self ._chunk_path (removed_chunk ))
99
+ chunks_kept = list (self .chunks )
100
+ chunks_kept .remove (removed_chunk )
101
+
102
+ # pick two chunks, corrupt them
103
+ for _ in range (2 ):
104
+ corrupt_chunk = random .choice (chunks_kept )
105
+ chunks_kept .remove (corrupt_chunk )
106
+ self ._corrupt_chunk (corrupt_chunk )
107
+
108
+ # run the rebuilder, check failure
109
+ chunk_id = removed_chunk ['url' ].split ('/' )[3 ]
110
+ chunk_volume = removed_chunk ['url' ].split ('/' )[2 ]
111
+ conf = self .conf .copy ()
112
+ conf ['allow_same_rawx' ] = True
113
+ rebuilder = BlobRebuilder (conf , service_id = chunk_volume ,
114
+ logger = self .logger )
115
+ rebuilder_worker = rebuilder .create_worker (None , None )
116
+ self .assertRaises (
117
+ ECDriverError ,
118
+ rebuilder_worker ._process_item ,
119
+ (self .ns , self .cid , self .content_id , chunk_id ))
120
+
121
+ # run the rebuilder with options, check success
122
+ conf ['allow_same_rawx' ] = True
123
+ conf ['read_all_available_sources' ] = True
124
+ rebuilder = BlobRebuilder (conf , service_id = chunk_volume ,
125
+ logger = self .logger )
126
+ rebuilder_worker = rebuilder .create_worker (None , None )
127
+ rebuilt_bytes = rebuilder_worker ._process_item (
128
+ (self .ns , self .cid , self .content_id , chunk_id ))
129
+ self .assertEqual (removed_chunk_size , rebuilt_bytes )
130
+
69
131
def test_rebuild_old_chunk (self ):
70
132
for c in self .chunks :
71
133
convert_to_old_chunk (
0 commit comments