• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

datamllab / tods / 4573166454

pending completion
4573166454

push

travis-pro

LSC2204
fix unit tests

6 of 6 new or added lines in 1 file covered. (100.0%)

13515 of 14488 relevant lines covered (93.28%)

1.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tods/detection_algorithm/SystemWiseDetection_bkup.py
1
import os
×
2
from typing import Any,Optional,List
×
3
import statsmodels.api as sm
×
4
import numpy as np
×
5
from d3m import container, utils as d3m_utils
×
6
from d3m import utils
×
7

8
from numpy import ndarray
×
9
from collections import OrderedDict
×
10
from scipy import sparse
×
11
import os
×
12

13
import numpy
×
14
import typing
×
15
import time
×
16

17
from d3m import container
×
18
from d3m.primitive_interfaces import base, transformer
×
19

20
from d3m.container import DataFrame as d3m_dataframe
×
21
from d3m.metadata import hyperparams, params, base as metadata_base
×
22

23
from d3m.base import utils as base_utils
×
24
import uuid
×
25
from d3m.exceptions import PrimitiveNotFittedError
×
26

27
__all__ = ('SystemWiseDetectionPrimitive',)
×
28

29
Inputs = container.DataFrame
×
30
Outputs = container.DataFrame
×
31
from tods.utils import construct_primitive_metadata
×
32
class Params(params.Params): # pragma: no cover
33
       #to-do : how to make params dynamic
34
       use_column_names: Optional[Any]
35

36

37

38
class Hyperparams(hyperparams.Hyperparams): # pragma: no cover
39

40
       #Tuning Parameter
41
       #default -1 considers entire time series is considered
42
       window_size = hyperparams.Hyperparameter(default=10, semantic_types=[
43
           'https://metadata.datadrivendiscovery.org/types/TuningParameter',
44
       ], description="Window Size for decomposition")
45

46
       method_type = hyperparams.Enumeration(
47
           values=['max', 'avg', 'sliding_window_sum','majority_voting_sliding_window_sum','majority_voting_sliding_window_max'],
48
           default='majority_voting_sliding_window_max',
49
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
50
           description="The type of method used to find anomalous system",
51
       )
52
       contamination = hyperparams.Uniform(
53
           lower=0.,
54
           upper=0.5,
55
           default=0.1,
56
           description='The amount of contamination of the data set, i.e. the proportion of outliers in the data set. ',
57
           semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
58
       )
59

60
       #control parameter
61
       use_columns = hyperparams.Set(
62
           elements=hyperparams.Hyperparameter[int](-1),
63
           default=(),
64
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
65
           description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
66
       )
67
       exclude_columns = hyperparams.Set(
68
           elements=hyperparams.Hyperparameter[int](-1),
69
           default=(),
70
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
71
           description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
72
       )
73
       return_result = hyperparams.Enumeration(
74
           values=['append', 'replace', 'new'],
75
           default='new',
76
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
77
           description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
78
       )
79
       use_semantic_types = hyperparams.UniformBool(
80
           default=False,
81
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
82
           description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe"
83
       )
84
       add_index_columns = hyperparams.UniformBool(
85
           default=False,
86
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
87
           description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
88
       )
89
       error_on_no_input = hyperparams.UniformBool(
90
           default=True,
91
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
92
           description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
93
       )
94

95
       return_semantic_type = hyperparams.Enumeration[str](
96
           values=['https://metadata.datadrivendiscovery.org/types/Attribute',
97
                   'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
98
           default='https://metadata.datadrivendiscovery.org/types/Attribute',
99
           description='Decides what semantic type to attach to generated attributes',
100
           semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
101
       )
102

103

104

105
class SystemWiseDetectionPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): # pragma: no cover
106
    """
107
    Primitive to find abs_energy of time series.
108

109
    Parameters
110
    ----------
111
    window_size :int(default=10)
112
        Window Size for decomposition
113

114
    method_type :str ('max', 'avg', 'sliding_window_sum','majority_voting_sliding_window_sum','majority_voting_sliding_window_max')
115
        The type of method used to find anomalous system
116

117
    contamination : float in (0., 0.5), optional (default=0.1)
118
           The amount of contamination of the data set, i.e. the proportion of outliers in the data set. 
119
    """
120

121
    metadata = construct_primitive_metadata(module='detection_algorithm', name='system_wise_detection', id='Sytem_Wise_Anomaly_Detection_Primitive', primitive_family='anomaly_detect', hyperparams=['window_size','method_type','contamination'], description='Sytem_Wise_Anomaly_Detection_Primitive')
122

123
    def __init__(self, *, hyperparams: Hyperparams) -> None:
124
        super().__init__(hyperparams=hyperparams)
125
        self.primitiveNo = 0
126

127
    def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]:
128
        """
129
        Args:
130
            inputs: Container DataFrame
131
            timeout: Default
132
            iterations: Default
133
        Returns:
134
            Container DataFrame containing abs_energy of  time series
135
        """
136

137
        self.logger.info('System wise Detection Input  Primitive called')
138
        
139
        # Get cols to fit.
140
        self._fitted = False
141
        self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams)
142
        self._input_column_names = self._training_inputs.columns
143

144
        if len(self._training_indices) > 0:
145
            # self._clf.fit(self._training_inputs)
146
            self._fitted = True
147
        else:
148
            if self.hyperparams['error_on_no_input']:
149
                raise RuntimeError("No input columns were selected")
150
            self.logger.warn("No input columns were selected")
151

152
        if not self._fitted:
153
            raise PrimitiveNotFittedError("Primitive not fitted.")
154
        system_wise_detection_input = inputs
155
        if self.hyperparams['use_semantic_types']:
156
            system_wise_detection_input = inputs.iloc[:, self._training_indices]
157
        output_columns = []
158
        if len(self._training_indices) > 0:
159
            system_wise_detection_output = self._system_wise_detection(system_wise_detection_input,self.hyperparams["method_type"],self.hyperparams["window_size"],self.hyperparams["contamination"])
160
            outputs = system_wise_detection_output
161

162

163
            if sparse.issparse(system_wise_detection_output):
164
                system_wise_detection_output = system_wise_detection_output.toarray()
165
            outputs = self._wrap_predictions(inputs, system_wise_detection_output)
166

167
            #if len(outputs.columns) == len(self._input_column_names):
168
               # outputs.columns = self._input_column_names
169

170
            output_columns = [outputs]
171

172

173
        else:
174
            if self.hyperparams['error_on_no_input']:
175
                raise RuntimeError("No input columns were selected")
176
            self.logger.warn("No input columns were selected")
177

178

179
        self.logger.info('System wise Detection  Primitive returned')
180
        outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'],
181
                                             add_index_columns=self.hyperparams['add_index_columns'],
182
                                             inputs=inputs, column_indices=self._training_indices,
183
                                             columns_list=output_columns)
184
        return base.CallResult(outputs)
185

186
    @classmethod
187
    def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams):
188
        """
189
        Select columns to fit.
190
        Args:
191
            inputs: Container DataFrame
192
            hyperparams: d3m.metadata.hyperparams.Hyperparams
193
        Returns:
194
            list
195
        """
196
        if not hyperparams['use_semantic_types']:
197
            return inputs, list(range(len(inputs.columns)))
198

199
        inputs_metadata = inputs.metadata
200

201
        def can_produce_column(column_index: int) -> bool:
202
            return cls._can_produce_column(inputs_metadata, column_index, hyperparams)
203

204
        use_columns = hyperparams['use_columns']
205
        exclude_columns = hyperparams['exclude_columns']
206

207
        columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
208
                                                                                   use_columns=use_columns,
209
                                                                                   exclude_columns=exclude_columns,
210
                                                                                   can_use_column=can_produce_column)
211
        return inputs.iloc[:, columns_to_produce], columns_to_produce
212
        # return columns_to_produce
213

214
    @classmethod
215
    def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int,
216
                            hyperparams: Hyperparams) -> bool:
217
        """
218
        Output whether a column can be processed.
219
        Args:
220
            inputs_metadata: d3m.metadata.base.DataMetadata
221
            column_index: int
222
        Returns:
223
            bool
224
        """
225
        column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))
226

227
        accepted_structural_types = (int, float, numpy.integer, numpy.float64)
228
        accepted_semantic_types = set()
229
        accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")
230
        if not issubclass(column_metadata['structural_type'], accepted_structural_types):
231
            return False
232

233
        semantic_types = set(column_metadata.get('semantic_types', []))
234
        return True
235
        if len(semantic_types) == 0:
236
            cls.logger.warning("No semantic types found in column metadata")
237
            return False
238

239
        # Making sure all accepted_semantic_types are available in semantic_types
240
        if len(accepted_semantic_types - semantic_types) == 0:
241
            return True
242

243
        return False
244

245
    @classmethod
246
    def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
247
                                     target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:
248
        """
249
        Updata metadata for selected columns.
250
        Args:
251
            inputs_metadata: metadata_base.DataMetadata
252
            outputs: Container Dataframe
253
            target_columns_metadata: list
254
        Returns:
255
            d3m.metadata.base.DataMetadata
256
        """
257
        outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)
258

259
        for column_index, column_metadata in enumerate(target_columns_metadata):
260
            column_metadata.pop("structural_type", None)
261
            outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)
262

263
        return outputs_metadata
264

265
    def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs:
266
        """
267
        Wrap predictions into dataframe
268
        Args:
269
            inputs: Container Dataframe
270
            predictions: array-like data (n_samples, n_features)
271
        Returns:
272
            Dataframe
273
        """
274
        outputs = d3m_dataframe(predictions, generate_metadata=True)
275
        target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams,self.primitiveNo)
276
        outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
277

278
        return outputs
279

280
    @classmethod
281
    def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams, primitiveNo):
282
        """
283
        Add target columns metadata
284
        Args:
285
            outputs_metadata: metadata.base.DataMetadata
286
            hyperparams: d3m.metadata.hyperparams.Hyperparams
287
        Returns:
288
            List[OrderedDict]
289
        """
290
        outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
291
        target_columns_metadata: List[OrderedDict] = []
292
        for column_index in range(outputs_length):
293
            column_name = "{0}{1}_{2}".format(cls.metadata.query()['name'], primitiveNo, column_index)
294
            column_metadata = OrderedDict()
295
            semantic_types = set()
296
            semantic_types.add(hyperparams["return_semantic_type"])
297
            column_metadata['semantic_types'] = list(semantic_types)
298

299
            column_metadata["name"] = str(column_name)
300
            target_columns_metadata.append(column_metadata)
301

302
        return target_columns_metadata
303

304
    def _write(self, inputs: Inputs):
305
        inputs.to_csv(str(time.time()) + '.csv')
306

307
    def _system_wise_detection(self,X,method_type,window_size,contamination):
308
        #systemIds = X.system_id.unique()
309
        systemIds = [int(idx) for idx in X.index]
310
        #groupedX = X.groupby(X.system_id)
311
        print(systemIds)
312
        print(X.iloc[0])
313
        systemDf = X.iloc(systemIds[0])['system']
314
        print(systemDf)
315
        exit()
316

317
        transformed_X = []
318
        if(method_type=="max"):
319
            """
320
            Sytems are sorted based on maximum of reconstruction errors"
321
            """
322
            maxOutlierScorePerSystemList = []
323
            for systemId in systemIds:
324
                systemDf = groupedX.get_group(systemId)
325
                #systemDf = X[systemId]['system']
326
                maxOutlierScorePerSystemList.append(np.max(np.abs(systemDf["value_0"].values)))
327

328
            ranking = np.sort(maxOutlierScorePerSystemList)
329
            threshold = ranking[int((1 - contamination) * len(ranking))]
330
            self.threshold = threshold
331
            mask = (maxOutlierScorePerSystemList >= threshold)
332
            ranking[mask] = 1
333
            ranking[np.logical_not(mask)] = 0
334
            for iter in range(len(systemIds)):
335
                transformed_X.append([systemIds[iter],ranking[iter]])
336

337
        if (method_type == "avg"):
338
            """
339
            Sytems are sorted based on average of reconstruction errors"
340
            """
341
            avgOutlierScorePerSystemList = []
342
            for systemId in systemIds:
343
                systemDf = groupedX.get_group(systemId)
344
                avgOutlierScorePerSystemList.append(np.mean(np.abs(systemDf["value_0"].values)))
345

346
            ranking = np.sort(avgOutlierScorePerSystemList)
347
            threshold = ranking[int((1 - contamination) * len(ranking))]
348
            self.threshold = threshold
349
            mask = (avgOutlierScorePerSystemList >= threshold)
350
            ranking[mask] = 1
351
            ranking[np.logical_not(mask)] = 0
352
            for iter in range(len(systemIds)):
353
                transformed_X.append([systemIds[iter], ranking[iter]])
354

355
        if (method_type == "sliding_window_sum"):
356
            """
357
            Sytems are sorted based on max of max of reconstruction errors in each window"
358
            """
359
            OutlierScorePerSystemList = []
360
            for systemId in systemIds:
361
                systemDf = groupedX.get_group(systemId)
362
                column_value = systemDf["value_0"].values
363
                column_score = np.zeros(len(column_value))
364
                for iter in range(window_size - 1, len(column_value)):
365
                    sequence = column_value[iter - window_size + 1:iter + 1]
366
                    column_score[iter] = np.sum(np.abs(sequence))
367
                column_score[:window_size - 1] = column_score[window_size - 1]
368
                OutlierScorePerSystemList.append(column_score.tolist())
369
            OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList)
370

371
            maxOutlierScorePerSystemList = OutlierScorePerSystemList.max(axis=1).tolist()
372

373
            ranking = np.sort(maxOutlierScorePerSystemList)
374
            threshold = ranking[int((1 - contamination) * len(ranking))]
375
            self.threshold = threshold
376
            mask = (maxOutlierScorePerSystemList >= threshold)
377
            ranking[mask] = 1
378
            ranking[np.logical_not(mask)] = 0
379
            for iter in range(len(systemIds)):
380
                transformed_X.append([systemIds[iter], ranking[iter]])
381

382
        if (method_type == "majority_voting_sliding_window_sum"):
383
            """
384
            Sytem with most vote based on max of sum of reconstruction errors in each window
385
            """
386
            OutlierScorePerSystemList = []
387
            for systemId in systemIds:
388
                systemDf = groupedX.get_group(systemId)
389
                column_value = systemDf["value_0"].values
390
                column_score = np.zeros(len(column_value))
391
                for iter in range(window_size - 1, len(column_value)):
392
                    sequence = column_value[iter - window_size + 1:iter + 1]
393
                    column_score[iter] = np.sum(np.abs(sequence))
394
                column_score[:window_size - 1] = column_score[window_size - 1]
395
                OutlierScorePerSystemList.append(column_score.tolist())
396
            OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList)
397
            OutlierScorePerSystemList = (
398
                    OutlierScorePerSystemList == OutlierScorePerSystemList.max(axis=0)[None, :]).astype(int)
399

400
            maxOutlierScorePerSystemList = OutlierScorePerSystemList.sum(axis=1).tolist()
401

402
            ranking = np.sort(maxOutlierScorePerSystemList)
403
            threshold = ranking[int((1 - contamination) * len(ranking))]
404
            self.threshold = threshold
405
            mask = (maxOutlierScorePerSystemList >= threshold)
406
            ranking[mask] = 1
407
            ranking[np.logical_not(mask)] = 0
408
            for iter in range(len(systemIds)):
409
                transformed_X.append([systemIds[iter], ranking[iter]])
410

411
        if (method_type == "majority_voting_sliding_window_max"):
412
            """
413
            Sytem with most vote based on max of max of reconstruction errors in each window
414
            """
415
            OutlierScorePerSystemList = []
416
            for systemId in systemIds:
417
                systemDf = groupedX.get_group(systemId)
418
                column_value = systemDf["value_0"].values
419
                column_score = np.zeros(len(column_value))
420
                for iter in range(window_size - 1, len(column_value)):
421
                    sequence = column_value[iter - window_size + 1:iter + 1]
422
                    column_score[iter] = np.max(np.abs(sequence))
423
                column_score[:window_size - 1] = column_score[window_size - 1]
424
                OutlierScorePerSystemList.append(column_score.tolist())
425
            OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList)
426
            OutlierScorePerSystemList = (
427
                    OutlierScorePerSystemList == OutlierScorePerSystemList.max(axis=0)[None, :]).astype(int)
428

429
            maxOutlierScorePerSystemList = OutlierScorePerSystemList.sum(axis=1).tolist()
430

431
            ranking = np.sort(maxOutlierScorePerSystemList)
432
            threshold = ranking[int((1 - contamination) * len(ranking))]
433
            self.threshold = threshold
434
            mask = (maxOutlierScorePerSystemList >= threshold)
435
            ranking[mask] = 1
436
            ranking[np.logical_not(mask)] = 0
437
            for iter in range(len(systemIds)):
438
                transformed_X.append([systemIds[iter], ranking[iter]])
439

440
        return transformed_X
441

442

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc