• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IIIF / image-validator / 12285728456

11 Dec 2024 10:07PM UTC coverage: 40.498% (+2.5%) from 37.982%
12285728456

Pull #106

github

glenrobson
Moving to updating pypi via github actions
Pull Request #106: Adding github actions

699 of 1726 relevant lines covered (40.5%)

2.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.76
/iiif_validator/validator.py
1
"""IIIF Image API validator module."""
2

3
from functools import partial
5✔
4
from bottle import Bottle, route, run, request, response, abort, error
5✔
5
import inspect
5✔
6
import json
5✔
7
import sys
5✔
8
import random
5✔
9
import os
5✔
10
try:  # python2
5✔
11
    import BytesIO as io
5✔
12
    # Must check for python2 first as io exists but is wrong one
13
except ImportError:  # python3
5✔
14
    import io
5✔
15
try:
5✔
16
    # python3
17
    from urllib.request import urlopen, Request, HTTPError
5✔
18
    from urllib.error import URLError
5✔
19
except ImportError:
×
20
    # fall back to python2
21
    from urllib2 import urlopen, Request, HTTPError, URLError
×
22
try:
5✔
23
    from PIL import Image, ImageDraw
5✔
24
except:
×
25
    import Image, ImageDraw
×
26
from .tests.test import ValidatorError
5✔
27
from . import tests
5✔
28

29

30
class ValidationInfo(object):
5✔
31
    def __init__(self):
5✔
32

33
        self.mimetypes = {'bmp' : 'image/bmp',  
5✔
34
                   'gif' : 'image/gif', 
35
                   'jpg': 'image/jpeg', 
36
                   'pcx' : 'image/pcx', 
37
                   'pdf' :  'application/pdf', 
38
                   'png' : 'image/png', 
39
                   'tif' : 'image/tiff',
40
                   'webp' : 'image/webp'}
41

42
        self.pil_formats = {'BMP' : 'image/bmp',  
5✔
43
                   'GIF' : 'image/gif', 
44
                   'JPEG': 'image/jpeg', 
45
                   'PCX' : 'image/pcx', 
46
                   'PDF' :  'application/pdf', 
47
                   'PNG' : 'image/png', 
48
                   'TIFF' : 'image/tiff'}
49
        
50
        self.colorInfo = [[(61, 170, 126), (61, 107, 178), (82, 85, 234), (164, 122, 110), (129, 226, 88), (91, 37, 121), (138, 128, 42), (6, 85, 234), (121, 109, 204), (65, 246, 84)], 
5✔
51
            [(195, 133, 120), (171, 43, 102), (118, 45, 130), (242, 105, 171), (5, 85, 105), (113, 58, 41), (223, 69, 3), (45, 79, 140), (35, 117, 248), (121, 156, 184)], 
52
            [(168, 92, 163), (28, 91, 143), (86, 41, 173), (111, 230, 29), (174, 189, 7), (18, 139, 88), (93, 168, 128), (35, 2, 14), (204, 105, 137), (18, 86, 128)], 
53
            [(107, 55, 178), (251, 40, 184), (47, 36, 139), (2, 127, 170), (224, 12, 114), (133, 67, 108), (239, 174, 209), (85, 29, 156), (8, 55, 188), (240, 125, 7)], 
54
            [(112, 167, 30), (166, 63, 161), (232, 227, 23), (74, 80, 135), (79, 97, 47), (145, 160, 80), (45, 160, 79), (12, 54, 215), (203, 83, 70), (78, 28, 46)], 
55
            [(102, 193, 63), (225, 55, 91), (107, 194, 147), (167, 24, 95), (249, 214, 96), (167, 34, 136), (53, 254, 209), (172, 222, 21), (153, 77, 51), (137, 39, 183)], 
56
            [(159, 182, 192), (128, 252, 173), (148, 162, 90), (192, 165, 115), (154, 102, 2), (107, 237, 62), (111, 236, 219), (129, 113, 172), (239, 204, 166), (60, 96, 37)], 
57
            [(72, 172, 227), (119, 51, 100), (209, 85, 165), (87, 172, 159), (188, 42, 162), (99, 3, 54), (7, 42, 37), (105, 155, 100), (38, 220, 240), (98, 46, 2)], 
58
            [(18, 223, 145), (189, 121, 17), (88, 3, 210), (181, 16, 43), (189, 39, 244), (123, 147, 116), (246, 148, 214), (223, 177, 199), (77, 18, 136), (235, 36, 21)], 
59
            [(146, 137, 176), (84, 248, 55), (61, 144, 79), (110, 251, 49), (43, 105, 132), (165, 131, 55), (60, 23, 225), (147, 197, 226), (80, 67, 104), (161, 119, 182)]]
60
  
61
    def do_test_square(self, img, x,y, result):
5✔
62
        truth = self.colorInfo[x][y]
×
63
        # Similarity, not necessarily perceived
64
        cols = img.getcolors(maxcolors=1000000) #1kx1k image so <=1M colors
×
65
        cols.sort(reverse=True)
×
66
        col = cols[0][1]
×
67
        # If image has palette, col is int and we look up [r,g,b]
68
        pal = img.getpalette()
×
69
        if (pal):
×
70
            col = [pal[col*3],pal[col*3+1],pal[col*3+2]]
×
71
        ok = abs(col[0]-truth[0]) < 6 and abs(col[1]-truth[1]) < 6 and abs(col[2]-truth[2]) < 6
×
72
        result.tests.append("%s,%s:%s" % (x,y,ok))
×
73
        return ok           
×
74

75
    def make_randomstring(self, length):
5✔
76
        stuff = []
×
77
        for x in range(length):
×
78
            stuff.append(chr(random.randint(48, 122)))    
×
79
        val = ''.join(stuff)
×
80
        # prevent end-of-path-segment characters
81
        val = val.replace('?', '$')
×
82
        val = val.replace('#', '$')
×
83
        val = val.replace('/', '$')
×
84
        return val
×
85

86
    def check(self, typ, got, expected, result=None, errmsg="", warning=False):
5✔
87
        if type(expected) == list:
5✔
88
            if not got in expected:
×
89
                raise ValidatorError(typ, got, expected, result, errmsg, warning)
×
90
        elif got != expected:
5✔
91
            raise ValidatorError(typ, got, expected, result, errmsg, warning)
5✔
92
        if result:
5✔
93
            result.tests.append(typ)
5✔
94
        return 1
5✔
95

96
        
97
class TestSuite(object):
5✔
98

99
    def __init__(self, info):
5✔
100
        self.validationInfo = info
5✔
101
        # Look at all modules imported as tests, find Test_* classes,
102
        # take only the first class from any module
103
        self.all_tests = {}
5✔
104
        for module_name, module in inspect.getmembers(tests, inspect.ismodule):
5✔
105
            # print('mod: ' + module_name)
106
            for name, klass in inspect.getmembers(module, inspect.isclass):
5✔
107
                # print('klass: ' + name)
108
                if (name.startswith('Test_')):
5✔
109
                    self.all_tests[module_name] = klass
5✔
110
                    break
5✔
111
        # print(self.all_tests)
112

113
    def has_test(self, test):
5✔
114
        return hasattr(tests, test)
×
115
          
116
    def list_tests(self, version=""):
5✔
117
        allt = {}
×
118
        for name, klass in self.all_tests.items():
×
119
            data = klass.make_info(version)
×
120
            if data:
×
121
                allt[name] = data
×
122
        return allt
×
123

124
    def run_test(self, test_name, result):   
5✔
125
        klass = self.all_tests[test_name]
×
126
        test = klass(self.validationInfo)
×
127

128
        result.test_info = test.make_info(result.version)
×
129

130
        try:
×
131
            return test.run(result)
×
132
        except ValidatorError as e:
×
133
            result.exception = e
×
134
            return result
×
135
 
136
class ImageAPI(object):
5✔
137

138
    def __init__(self, identifier, server, prefix="", scheme="http", auth="", version="2.0", debug=True):
5✔
139

140
        self.iiifNS = "{http://library.stanford.edu/iiif/image-api/ns/}"
5✔
141
        self.debug = debug
5✔
142

143
        self.scheme = scheme
5✔
144
        self.server = server
5✔
145
        if not prefix:
5✔
146
            self.prefix = ""
5✔
147
        else:
148
            self.prefix = prefix.split('/')
×
149
        self.identifier = identifier
5✔
150
        self.auth = auth
5✔
151

152
        self.version = version
5✔
153

154
        self.last_headers = {}
5✔
155
        self.last_status = 0
5✔
156
        self.last_url = ''
5✔
157

158
        # DOUBLE duty as result object
159
        self.name = ""
5✔
160
        self.urls = []
5✔
161
        self.tests = []
5✔
162
        self.exception = None
5✔
163

164
    def parse_links(self, header):
5✔
165

166
        state = 'start'
×
167
        header = header.strip()
×
168
        data = [d for d in header]
×
169
        links = {}
×
170
        while data:
×
171
            if state == 'start':
×
172
                d = data.pop(0)
×
173
                while d.isspace():
×
174
                    d = data.pop(0)
×
175
                if d != "<":
×
176
                    raise ValueError("Parsing Link Header: Expected < in start, got %s" % d)                    
×
177
                state = "uri"
×
178
            elif state == "uri":
×
179
                uri = []
×
180
                d = data.pop(0)                
×
181
                while d != ";":
×
182
                    uri.append(d)
×
183
                    d = data.pop(0)
×
184
                uri = ''.join(uri)
×
185
                uri = uri[:-1]
×
186
                data.insert(0, ';')
×
187
                # Not an error to have the same URI multiple times (I think!)
188
                if (uri not in links):
×
189
                    links[uri] = {}
×
190
                state = "paramstart"
×
191
            elif state == 'paramstart':
×
192
                d = data.pop(0)
×
193
                while data and d.isspace():
×
194
                    d = data.pop(0)
×
195
                if d == ";":
×
196
                    state = 'linkparam';
×
197
                elif d == ',':
×
198
                    state = 'start'
×
199
                else:
200
                    raise ValueError("Parsing Link Header: Expected ; in paramstart, got %s" % d)
×
201
                    return
202
            elif state == 'linkparam':
×
203
                d = data.pop(0)
×
204
                while d.isspace():
×
205
                    d = data.pop(0)
×
206
                paramType = []
×
207
                while not d.isspace() and d != "=":
×
208
                    paramType.append(d)
×
209
                    d = data.pop(0)
×
210
                while d.isspace():
×
211
                    d = data.pop(0)
×
212
                if d != "=":
×
213
                    raise ValueError("Parsing Link Header: Expected = in linkparam, got %s" % d)
×
214
                    return
215
                state='linkvalue'
×
216
                pt = ''.join(paramType)
×
217
                if (pt not in links[uri]):
×
218
                    links[uri][pt] = []
×
219
            elif state == 'linkvalue':
×
220
                d = data.pop(0)
×
221
                while d.isspace():
×
222
                    d = data.pop(0)
×
223
                paramValue = []
×
224
                if d == '"':
×
225
                    pd = d
×
226
                    d = data.pop(0)
×
227
                    while d != '"' and pd != '\\':
×
228
                        paramValue.append(d)
×
229
                        pd = d
×
230
                        d = data.pop(0)
×
231
                else:
232
                    while not d.isspace() and not d in (',', ';'):
×
233
                        paramValue.append(d)
×
234
                        if data:
×
235
                            d = data.pop(0)
×
236
                        else:
237
                            break
×
238
                    if data:
×
239
                        data.insert(0, d)
×
240
                state = 'paramstart'
×
241
                pv = ''.join(paramValue)
×
242
                if pt == 'rel':
×
243
                    # rel types are case insensitive and space separated
244
                    links[uri][pt].extend([y.lower() for y in pv.split(' ')])
×
245
                else:
246
                    if not pv in links[uri][pt]:
×
247
                        links[uri][pt].append(pv)
×
248
        return links
×
249

250
    def get_uri_for_rel(self, links, rel):
5✔
251
        rel = rel.lower()
×
252
        for (uri, info) in links.items():
×
253
            rels = info.get('rel', [])
×
254
            if rel in rels:
×
255
                return uri
×
256
        return None
×
257

258
    def fetch(self, url):
5✔
259
        # Make it look like a real browser request
260
        HEADERS = {"Origin": "http://iiif.io/", 
×
261
            "Referer": "http://iiif.io/api/image/validator",
262
            "User-Agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.1b3pre) Gecko/20081130 Minefield/3.1b3pre"}
263
        req = Request(url, headers=HEADERS)
×
264

265
        try:
×
266
            wh = urlopen(req, timeout=5)
×
267
        except HTTPError as e:
×
268
            wh = e
×
269
        except Exception as error:
×
270
            print ('Other type of error ({}): {}'.format(type(error), error))
×
271
            raise 
×
272
        data = wh.read()
×
273
        # nasty side effect
274
        try:  # py2
×
275
            self.last_headers = wh.headers.dict
×
276
        except:  # py3
×
277
            self.last_headers = wh.info()
×
278
        self.last_status = wh.code
×
279
        self.last_url = url
×
280
        wh.close()
×
281
        self.urls.append(url)
×
282
        return(data)
×
283

284
    def make_url(self, params={}):
5✔
285
        if self.prefix and 'prefix' not in params:
×
286
            params['prefix'] = self.prefix
×
287
        if 'identifier' not in params:
×
288
            params['identifier'] = self.identifier
×
289
        if 'region' not in params:
×
290
            params['region'] = 'full'
×
291
        if 'size' not in params:
×
292
            if self.version == "3.0":
×
293
                params['size'] = 'max'
×
294
            else:
295
                params['size'] = 'full'
×
296
        if 'rotation' not in params:
×
297
            params['rotation'] = '0'
×
298
        if 'quality' not in params:
×
299
            if self.version in ("2.0", "3.0"):
×
300
                params['quality'] = 'default'
×
301
            else:
302
                params['quality'] = 'native'        
×
303
        elif params['quality'] == 'grey' and self.version in ("2.0", "3.0"):
×
304
            # en-us in 2.0+
305
            params['quality'] = 'gray'
×
306
        if 'format' not in params and self.version in ("2.0", "3.0"):
×
307
            # format is required in 2.0+
308
            params['format'] = 'jpg'
×
309

310
        order = ('prefix','identifier','region','size','rotation','quality')
×
311

312
        if 'prefix' in params:
×
313
            params['prefix'] = '/'.join(self.prefix)
×
314
        url = '/'.join(params.get(p) for p in order if params.get(p) is not None)
×
315

316
        if params.get('format') is not None:
×
317
            url+='.%s' % params['format']
×
318

319
        scheme = params.get('scheme', self.scheme)
×
320
        server = params.get('server', self.server)
×
321
        url = "%s://%s/%s" % (scheme, server, url)
×
322
        if (self.debug):
×
323
            print(url)
×
324
        return url
×
325

326
    def make_image(self, data):
5✔
327
        imgio = io.BytesIO(data)
×
328
        img = Image.open(imgio)
×
329
        return img
×
330

331
    def get_image(self, params):
5✔
332
        url = self.make_url(params)
×
333
        imgdata = self.fetch(url)
×
334
        img = self.make_image(imgdata)
×
335
        return img
×
336

337
    def make_info_url(self, format='json'):
5✔
338
        params = {'server':self.server, 'identifier':self.identifier, 'scheme':self.scheme}
×
339
        if self.prefix:
×
340
            parts = self.prefix[:]
×
341
        else:
342
            parts = []
×
343
        parts.extend([self.identifier, 'info'])
×
344
        url = '%s.%s' %  ('/'.join(parts), format)
×
345
        scheme = params.get('scheme', self.scheme)
×
346
        server = params.get('server', self.server)
×
347
        url = "%s://%s/%s" % (self.scheme, self.server, url)
×
348
        return url
×
349

350
    def get_info(self):
5✔
351
        url = self.make_info_url()
×
352
        try:
×
353
            idata = self.fetch(url) 
×
354
        except Exception as e:
×
355
            # uhoh
356
            #sys.stderr.write('fetch failed ' + str(e))
357
            return None
×
358
        try:
×
359
            info = json.loads(idata.decode('utf-8'))
×
360
        except Exception as e:
×
361
            #sys.stderr.write('json.loads failed ' + str(e))
362
            return None
×
363
        return info
×
364

365

366
class Validator(object):
5✔
367

368
    def __init__(self,debug=True):
5✔
369
        if (debug):
5✔
370
            sys.stderr.write('init on Validator\n')
×
371
            sys.stderr.flush()
×
372

373
    def handle_test(self, testname):
5✔
374

375
        version = request.query.get('version', '2.0')
×
376
        info = ValidationInfo()
×
377
        testSuite = TestSuite(info)
×
378

379
        if testname == "list_tests":
×
380
            all_tests = testSuite.list_tests(version)
×
381
            return json.dumps(all_tests)
×
382
        if not testSuite.has_test(testname):
×
383
            abort(404,"No such test: %s" % testname)
×
384

385
        server = request.query.get('server', '')
×
386
        server = server.strip()
×
387
        if server.startswith('https://'):
×
388
            scheme = 'https'
×
389
            server = server.replace('https://', '')
×
390
        else:
391
            scheme="http"
×
392
            server = server.replace('http://', '')  
×
393
        atidx = server.find('@') 
×
394
        if atidx > -1:
×
395
            auth = server[:atidx]
×
396
            server = server[atidx+1:]
×
397
        else:
398
            auth = ""
×
399
        if not server:
×
400
            abort(400, "Missing mandatory parameter: server")
×
401

402
        if server[-1] == '/':
×
403
            server = server[:-1]
×
404

405
        prefix = request.query.get('prefix', '')
×
406
        prefix = prefix.strip()
×
407
        if prefix:
×
408
            prefix = prefix.replace('%2F', '/')
×
409
            if prefix[-1] == '/':
×
410
                prefix = prefix[:-1]
×
411
            if prefix[0] == '/':
×
412
                prefix = prefix[1:]
×
413

414
        identifier = request.query.get('identifier', '')
×
415
        identifier = identifier.strip()
×
416
        if not identifier:
×
417
            abort(400, "Missing mandatory parameter: identifier")
×
418

419
        try:
×
420
            result = ImageAPI(identifier, server, prefix, scheme, auth, version)
×
421

422
            testSuite.run_test(testname, result)
×
423
            if result.exception:
×
424
                e = result.exception
×
425
                info = {'test' : testname, 'status': 'error', 'url':result.urls, 'got':e.got, 'expected': e.expected, 'type': e.type, 'message': e.message, 'warning': e.warning}
×
426
            else:
427
                info = {'test' : testname, 'status': 'success', 'url':result.urls, 'tests':result.tests}
×
428
            if result.test_info:
×
429
                info['label'] = result.test_info['label']
×
430

431
        except Exception as e:
×
432
            raise
×
433
            info = {'test' : testname, 'status': 'internal-error', 'url':e.url, 'msg':str(e)}
434
        infojson = json.dumps(info)
×
435
        return infojson
×
436
  
437
    def dispatch_views(self):
5✔
438
        pfx = ""
×
439
        self.app.route("/%s<testname>" % pfx, "GET", self.handle_test)
×
440

441
    def after_request(self):
5✔
442
        """A bottle hook for json responses."""
443
        response["content_type"] = "application/json"
×
444
        methods = 'GET'
×
445
        headers = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
×
446
        # Already added by apache config
447
        response.headers['Access-Control-Allow-Origin'] = '*'
×
448
        response.headers['Access-Control-Allow-Methods'] = methods
×
449
        response.headers['Access-Control-Allow-Headers'] = headers
×
450
        response.headers['Allow'] = methods
×
451
        # Add no cache so CloudFront doesn't cache response
452
        response.headers['Cache-Control'] = 'no-cache'
×
453
        response.headers['Pragma'] = 'no-cache'
×
454

455
    def not_implemented(self, *args, **kwargs):
5✔
456
        """Returns not implemented status."""
457
        abort(501)
×
458

459
    def empty_response(self, *args, **kwargs):
5✔
460
        """Empty response"""
461

462
    options_list = empty_response
5✔
463
    options_detail = empty_response
5✔
464

465
    def error(self, error, message=None):
5✔
466
        """Returns the error response."""
467
        data = json.dumps({"error": error.status_code, "message": error.body or message})
×
468
        # add content-type and CORS headers to error
469
        self.after_request()
×
470
        return data
×
471

472
    def get_error_handler(self):
5✔
473
        """Customized errors"""
474
        return {
×
475
            500: partial(self.error, message="Internal Server Error."),
476
            404: partial(self.error, message="Document Not Found."),
477
            501: partial(self.error, message="Not Implemented."),
478
            405: partial(self.error, message="Method Not Allowed."),
479
            403: partial(self.error, message="Forbidden."),
480
            400: self.error
481
        }
482

483
    def get_bottle_app(self):
5✔
484
        """Returns bottle instance"""
485
        self.app = Bottle()
×
486
        self.dispatch_views()
×
487
        self.app.hook('after_request')(self.after_request)
×
488
        self.app.error_handler = self.get_error_handler()
×
489
        return self.app
×
490

491

492
def apache():
5✔
493
    v = Validator()
×
494
    return v.get_bottle_app()
×
495

496
def main():
5✔
497
    mr = Validator()
×
498
    run(host='localhost', reloader=True, port=8080, app=mr.get_bottle_app())
×
499

500
if __name__ == "__main__":
5✔
501
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc