• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WPI-LNL / lnldb / 6621333835

24 Oct 2023 01:54AM UTC coverage: 91.077% (-0.6%) from 91.657%
6621333835

push

github

web-flow
Merge pull request #820 from WPI-LNL/remove-workorder-wizard

lnldb/urls: Remove workorder wizard route and related test cases

14729 of 16172 relevant lines covered (91.08%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.16
/data/views.py
1
import json
1✔
2
import mimetypes
1✔
3
import os
1✔
4
import stat
1✔
5

6
from django.conf import settings
1✔
7
from django.contrib.auth.decorators import login_required
1✔
8
from django.db import transaction
1✔
9
from django.http import FileResponse, HttpResponse, HttpResponseNotModified
1✔
10
from django.shortcuts import render
1✔
11
from django.urls import reverse
1✔
12
from django.utils import timezone
1✔
13
from django.utils.datastructures import MultiValueDictKeyError
1✔
14
from django.utils.dateparse import parse_datetime
1✔
15
from django.utils.http import http_date, urlquote_plus
1✔
16
from django.views.decorators.csrf import csrf_exempt
1✔
17
from django.views.decorators.http import require_GET, require_POST
1✔
18
from django.views.static import was_modified_since
1✔
19
from fuzzywuzzy import fuzz, process
1✔
20
import reversion
1✔
21
from watson import search as watson
1✔
22

23
from events import models as events_models
1✔
24
from emails.generators import EventEmailGenerator
1✔
25

26

27
def maintenance(request):
1✔
28
    """Display maintenance page"""
29
    context = {'title': 'Down for maintenance', 'noindex': True}
1✔
30
    return render(request, 'maintenance.html', context)
1✔
31

32

33
@login_required
1✔
34
# TODO: adjust for perm test
35
def search(request):
36
    """New search tool using Watson"""
37
    context = {}
1✔
38
    q = ""
1✔
39
    try:
1✔
40
        if request.POST:
1✔
41
            q = request.POST['q']
×
42
        else:
43
            q = request.GET['q']
1✔
44
    except MultiValueDictKeyError:
1✔
45
        pass
1✔
46
    context['query'] = q
1✔
47
    context['search_entry_list'] = watson.search(q)
1✔
48
    return render(request, 'search.html', context)
1✔
49

50

51
def serve_file(request, att_file, forced_name=None):
1✔
52
    statobj = os.stat(att_file.path)
1✔
53
    if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'),
1✔
54
                              statobj.st_mtime, statobj.st_size):
55
        return HttpResponseNotModified()
×
56
    content_type, encoding = mimetypes.guess_type(att_file.path)
1✔
57
    content_type = content_type or 'application/octet-stream'
1✔
58
    response = FileResponse(att_file, content_type=content_type)
1✔
59
    response["Last-Modified"] = http_date(statobj.st_mtime)
1✔
60
    if stat.S_ISREG(statobj.st_mode):
1✔
61
        response["Content-Length"] = statobj.st_size
1✔
62
    if encoding:
1✔
63
        response["Content-Encoding"] = encoding
×
64
    name = forced_name or att_file.name
1✔
65
    name = name.split('/')[-1]
1✔
66
    response["Content-Disposition"] = 'attachment; filename="%s"; filename*=UTF-8\'\'%s' % \
1✔
67
                                      (str(name).replace('"', ''), urlquote_plus(name))
68
    return response
1✔
69

70

71
@require_GET
1✔
72
def workorderwizard_load(request):
73
    """
74
    Endpoint the workorder wizard uses to load initial data.
75

76
    :returns: Dictionary of event locations, organizations, and user details; 401 status code if not authenticated
77
    """
78
    # Manually checking if user is authenticated rather than using @login_required
79
    # in order to return a 401 status that the workorder wizard understands so it can redirect the user to log in
80
    # instead of returning a 302 redirect to the login page, which wouldn't work because this view is called via AJAX
81
    if not request.user.is_authenticated:
×
82
        return HttpResponse('Unauthorized', status=401)
×
83

84
    response = {'locations': [], 'orgs': []}
×
85
    response['user'] = {'name': request.user.get_full_name(),
×
86
                        'email': request.user.email,
87
                        'phone': request.user.phone,
88
                        'address': request.user.addr}
89
    for loc in events_models.Location.objects.filter(show_in_wo_form=True):
×
90
        response['locations'].append({'id': loc.pk, 'name': loc.name, 'building': loc.building.name})
×
91
    for org in events_models.Organization.objects.filter(archived=False):
×
92
        data = {'id': org.pk,
×
93
                'name': org.name,
94
                'shortname': org.shortname,
95
                'owner': org.user_in_charge == request.user,
96
                'member': request.user in org.associated_users.all(),
97
                'delinquent': org.delinquent}
98
        if request.user.has_perm('events.view_org', org):
×
99
            data['email'] = org.exec_email
×
100
            data['phone'] = org.phone
×
101
            data['address'] = org.address
×
102
        response['orgs'].append(data)
×
103
    return HttpResponse(json.dumps(response))
×
104

105

106
@require_POST
1✔
107
@csrf_exempt
1✔
108
@transaction.atomic
1✔
109
def workorderwizard_submit(request):
110
    """
111
    Handles submission of workorder from workorder wizard.
112

113
    :returns: URL to event page on success; 422 status code if processing fails; 401 status code if unauthenticated
114
    """
115
    # Manually checking if user is authenticated rather than using @login_required
116
    # in order to return a 401 status that the workorder wizard understands so it can display a specific error message
117
    # instead of returning a 302 redirect to the login page, which wouldn't work because this view is called via AJAX
118
    if not request.user.is_authenticated:
×
119
        return HttpResponse('Unauthorized', status=401)
×
120

121
    # load JSON
122
    data = json.loads(request.body.decode('utf-8'))
×
123

124
    # check that all required fields are present
125
    mandatory_fields = ('org', 'event_name', 'location', 'start', 'end', 'setup_complete', 'services')
×
126
    if not all(key in data for key in mandatory_fields):
×
127
        return HttpResponse('Unprocessable Entity', status=422)
×
128

129
    reversion.set_comment('Event submitted using work order wizard')
×
130

131
    # create event object and populate fields
132
    event = events_models.Event2019()
×
133
    event.submitted_by = request.user
×
134
    event.submitted_ip = request.META.get('REMOTE_ADDR')
×
135
    event.contact = request.user
×
136
    event.event_name = data['event_name']
×
137
    if 'description' in data:
×
138
        event.description = data['description']
×
139
    try:
×
140
        event.location = events_models.Location.objects.filter(show_in_wo_form=True).get(pk=data['location'])
×
141
    except events_models.Location.DoesNotExist:
×
142
        return HttpResponse('Unprocessable Entity', status=422)
×
143
    event.datetime_setup_complete = parse_datetime(data['setup_complete'])
×
144
    event.datetime_start = parse_datetime(data['start'])
×
145
    event.datetime_end = parse_datetime(data['end'])
×
146
    try:
×
147
        org = events_models.Organization.objects.get(pk=data['org'])
×
148
    except events_models.Organization.DoesNotExist:
×
149
        return HttpResponse('Unprocessable Entity', status=422)
×
150
    event.billing_org = org
×
151

152
    # populate many-to-many fields
153
    event.save()
×
154
    event.org.add(org)
×
155
    
156
    # add services
157
    for service_data in data['services']:
×
158
        if 'id' not in service_data:
×
159
            return HttpResponse('Unprocessable Entity', status=422)
×
160
        try:
×
161
            service = events_models.Service.objects.filter(enabled_event2019=True).get(shortname=service_data['id'])
×
162
        except events_models.Service.DoesNotExist:
×
163
            return HttpResponse('Unprocessable Entity', status=422)
×
164
        service_instance = events_models.ServiceInstance()
×
165
        service_instance.service = service
×
166
        service_instance.event = event
×
167
        if 'detail' in service_data:
×
168
            service_instance.detail = service_data['detail']
×
169
        service_instance.save()
×
170

171
    # add extras
172
    for extra_data in data['extras']:
×
173
        if not all(key in extra_data for key in ('id', 'quantity')):
×
174
            return HttpResponse('Unprocessable Entity', status=422)
×
175
        try:
×
176
            extra = events_models.Extra.objects \
×
177
                .filter(disappear=False, services__in=event.serviceinstance_set.values_list('service', flat=True)) \
178
                .distinct().get(name=extra_data['id'])
179
        except events_models.Extra.DoesNotExist:
×
180
            return HttpResponse('Unprocessable Entity', status=422)
×
181
        extra_instance = events_models.ExtraInstance()
×
182
        extra_instance.extra = extra
×
183
        extra_instance.event = event
×
184
        extra_instance.quant = extra_data['quantity']
×
185
        extra_instance.save()
×
186

187
    # send confirmation email
188
    email_body = 'You have successfully submitted the following event.'
×
189
    bcc = [settings.EMAIL_TARGET_VP, settings.EMAIL_TARGET_HP] if event.has_projection else [settings.EMAIL_TARGET_VP]
×
190
    email = EventEmailGenerator(event=event, subject='New Event Submitted', to_emails=[request.user.email],
×
191
                                body=email_body, bcc=bcc)
192
    email.send()
×
193

194
    # If the user does not have permission to submit events on behalf of the selected organization,
195
    # send an email to the organization to alert them that the event was submitted
196
    # if not request.user.has_perm('events.create_org_event', org):
197
    #     email_body = ('The following event was submitted. You are receiving this email because the user who submitted '
198
    #                   'this event is not expressly authorized to submit events on behalf of {}. The organization owner '
199
    #                   'can update authorized users at {}.'.format(org.name,
200
    #                   request.scheme + '://' + request.get_host() + reverse('my:org-edit', args=(org.pk,))))
201
    #     email = EventEmailGenerator(event=event, subject='Event Submitted on behalf of {}'.format(org.name),
202
    #                                 to_emails=[org.exec_email], body=email_body, bcc=[settings.EMAIL_TARGET_W])
203
    #     email.send()
204

205
    # return response with the URL to the event detail page
206
    return HttpResponse(json.dumps({'event_url': reverse('events:detail', args=[event.pk])}))
×
207

208

209
@require_POST
1✔
210
@csrf_exempt
1✔
211
def workorderwizard_findprevious(request):
212
    """
213
    Checks for previous events submitted by a user in the last 18 months
214

215
    :returns: A previous event; 402 status code if nothing is found; 422 status code if processing fails; 401 status \
216
    code if unauthenticated
217
    """
218
    # Manually checking if user is authenticated rather than using @login_required
219
    # in order to return a 401 status that the workorder wizard understands so it can display a specific error message
220
    # instead of returning a 302 redirect to the login page, which wouldn't work because this view is called via AJAX
221
    if not request.user.is_authenticated:
×
222
        return HttpResponse('Unauthorized', status=401)
×
223

224
    # load JSON
225
    data = json.loads(request.body.decode('utf-8'))
×
226

227
    # check that all required fields are present
228
    mandatory_fields = ('org', 'event_name', 'location', 'start', 'end', 'setup_complete')
×
229
    if not all(key in data for key in mandatory_fields):
×
230
        return HttpResponse('Unprocessable Entity', status=422)
×
231

232
    # Search events that took place in the past 18 months for a match
233
    try:
×
234
        org = events_models.Organization.objects.get(pk=data['org'])
×
235
    except events_models.Organization.DoesNotExist:
×
236
        return HttpResponse('Unprocessable Entity', status=422)
×
237
    events_past_18_months = events_models.Event2019.objects.filter(
×
238
        org=org,
239
        test_event=False,
240
        datetime_start__gte=(timezone.now() - timezone.timedelta(days=548))
241
    )
242
    if not request.user.has_perm('events.list_org_events', org):
×
243
        events_past_18_months = events_past_18_months.exclude(approved=False)
×
244
    if not request.user.has_perm('events.list_org_hidden_events', org):
×
245
        events_past_18_months = events_past_18_months.exclude(sensitive=True)
×
246
    event_names = events_past_18_months.values_list('event_name', flat=True).distinct()
×
247
    name_closest_match = process.extractOne(data['event_name'], event_names, scorer=fuzz.ratio)
×
248
    if not name_closest_match or name_closest_match[1] < 80:
×
249
        # no match
250
        return HttpResponse(status=204)
×
251
    # match found
252
    closest_match = events_past_18_months.filter(event_name=name_closest_match[0]).order_by('-datetime_start').first()
×
253
    if (not closest_match.approved and not request.user.has_perm('events.view_events', closest_match)
×
254
            or closest_match.sensitive and not request.user.has_perm('events.view_hidden_event', closest_match)):
255
        # match blocked by lack of user permission
256
        return HttpResponse(status=204)
×
257

258
    # Prepare response
259
    services_data = []
×
260
    for service_instance in closest_match.serviceinstance_set.all():
×
261
        services_data.append({
×
262
            'id': service_instance.service.shortname,
263
            'detail': service_instance.detail
264
        })
265
    return HttpResponse(json.dumps({
×
266
        'event_name': closest_match.event_name,
267
        'location': closest_match.location.pk,
268
        'start': str(closest_match.datetime_start),
269
        'services': services_data
270
    }))
271

272

273
def err403(request, *args, **kwargs):
1✔
274
    context = {}
1✔
275
    return render(request, '403.html', context, status=403)
1✔
276

277

278
def err404(request, *args, **kwargs):
1✔
279
    context = {'status': '404', 'error_message': 'Not Found'}
1✔
280
    return render(request, '404.html', context, status=404)
1✔
281

282

283
def err500(request, *args, **kwargs):
1✔
284
    context = {}
1✔
285
    return render(request, '500.html', context, status=500)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc