• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

datamllab / tods / 4573166454

pending completion
4573166454

push

travis-pro

LSC2204
fix unit tests

6 of 6 new or added lines in 1 file covered. (100.0%)

13515 of 14488 relevant lines covered (93.28%)

1.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.74
/tods/detection_algorithm/Ensemble.py
1
from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple
2✔
2
from numpy import ndarray
2✔
3
from collections import OrderedDict
2✔
4
from scipy import sparse
2✔
5
import os
2✔
6
import sklearn
2✔
7
import numpy
2✔
8
import typing
2✔
9
import pandas as pd
2✔
10
# Custom import commands if any
11
from sklearn.preprocessing import Normalizer
2✔
12
import uuid
2✔
13

14

15
from d3m.container.numpy import ndarray as d3m_ndarray
2✔
16
from d3m.container import DataFrame as d3m_dataframe
2✔
17
from d3m.metadata import hyperparams, params, base as metadata_base
2✔
18
from d3m import utils
2✔
19
from d3m.base import utils as base_utils
2✔
20
from d3m.exceptions import PrimitiveNotFittedError
2✔
21
from d3m.primitive_interfaces.base import CallResult,DockerContainer
2✔
22
from d3m.primitive_interfaces.unsupervised_learning import UnsupervisedLearnerPrimitiveBase
2✔
23

24
import os
2✔
25
from typing import Any,Optional,List
2✔
26

27
from d3m import container, utils as d3m_utils
2✔
28
from d3m.metadata import base as metadata_base
2✔
29
from d3m.metadata import hyperparams,params
2✔
30
from d3m.primitive_interfaces import base, transformer
2✔
31

32
Inputs = d3m_dataframe
2✔
33
Outputs = d3m_dataframe
2✔
34
from tods.utils import construct_primitive_metadata
2✔
35

36
class Params(params.Params):
2✔
37
    input_column_names: Optional[Any]
2✔
38
    target_names_: Optional[Sequence[Any]]
2✔
39
    training_indices_: Optional[Sequence[int]]
2✔
40
    target_column_indices_: Optional[Sequence[int]]
2✔
41
    target_columns_metadata_: Optional[List[OrderedDict]]
2✔
42

43

44

45
class Hyperparams(hyperparams.Hyperparams):
2✔
46
    # Added by Mia
47
    endog = hyperparams.Bounded[int](
2✔
48
        lower = 2,
49
        upper = None,
50
        default = 3,
51
        description='Array like time series.',
52
        semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
53
    )
54

55
    threshold = hyperparams.Bounded[float](
2✔
56
        lower = 0,
57
        upper = 1,
58
        default = 0.5,
59
        semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
60
    )
61

62
    # keep previous
63
    norm = hyperparams.Enumeration[str](
2✔
64
        default='l2',
65
        values=['l1', 'l2', 'max'],
66
        description='The norm to use to normalize each non zero sample.',
67
        semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
68
    )
69
    
70
    use_columns = hyperparams.Set(
2✔
71
        elements=hyperparams.Hyperparameter[int](-1),
72
        default=(),
73
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
74
        description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
75
    )
76
    exclude_columns = hyperparams.Set(
2✔
77
        elements=hyperparams.Hyperparameter[int](-1),
78
        default=(),
79
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
80
        description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
81
    )
82
    return_result = hyperparams.Enumeration(
2✔
83
        values=['append', 'replace', 'new'],
84
        default='new',
85
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
86
        description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
87
    )
88
    use_semantic_types = hyperparams.UniformBool(
2✔
89
        default=False,
90
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
91
        description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe",
92
     )
93

94
    add_index_columns = hyperparams.UniformBool(
2✔
95
        default=False,
96
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
97
        description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
98
    )
99
    error_on_no_input = hyperparams.UniformBool(
2✔
100
        default=True,
101
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
102
        description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
103
    )
104
    
105
    return_semantic_type = hyperparams.Enumeration[str](
2✔
106
        values=['https://metadata.datadrivendiscovery.org/types/Attribute', 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
107
        default='https://metadata.datadrivendiscovery.org/types/Attribute',
108
        description='Decides what semantic type to attach to generated attributes',
109
        semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
110
    )
111

112
class EnsemblePrimitive(UnsupervisedLearnerPrimitiveBase[Inputs, Outputs, Params, Hyperparams]):
2✔
113
    """
114
    Ensemble method 
115
    Calculate the Maximum/Minimum/Average and  Majority Voting for the detection algorithm based on the threshold set for the score
116
    
117
Parameters
118
----------
119
    endog :int(lower = 2,upper = None,default = 3)
120
        Array like time series.
121
    threshold :float(lower = 0,upper = 1,default = 0.5)
122
    
123
    norm :str(default='l2',values=['l1', 'l2', 'max'])
124
        The norm to use to normalize each non zero sample.
125
    use_columns :Set
126
        A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.
127
    exclude_columns :Set
128
        A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.
129
    return_result :Enumeration
130
        Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.
131
    use_semantic_types :Bool
132
        Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe
133
    add_index_columns :Bool
134
        Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".
135
    error_on_no_input :Bool
136
        Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.
137
    return_semantic_type :Enumeration[str]
138
        Decides what semantic type to attach to generated attributes
139
    """
140
    
141
    metadata = construct_primitive_metadata(module='detection_algorithm', name='Ensemble', id='EnsemblePrimitive', primitive_family='anomaly_detect', hyperparams=['use_columns'], description='Ensemble')
2✔
142

143
    def __init__(self, *, # pragma: no cover
144
                 hyperparams: Hyperparams,
145
                 random_seed: int = 0,
146
                 docker_containers: Dict[str, DockerContainer] = None) -> None:
147

148
        super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
×
149
        
150
        # False
151
        self._clf = Normalizer(
×
152
              norm=self.hyperparams['norm'],
153
        )
154
        
155
        self._inputs = None
×
156
        self._outputs = None
×
157
        self._training_inputs = None
×
158
        self._training_outputs = None
×
159
        self._target_names = None
×
160
        self._training_indices = None
×
161
        self._target_column_indices = None
×
162
        self._target_columns_metadata: List[OrderedDict] = None
×
163
        self._input_column_names = None
×
164
        self._fitted = False
×
165
        
166
        
167
    def set_training_data(self, *, inputs: Inputs) -> None: # pragma: no cover
168
        self._inputs = inputs
169
        self._fitted = False
170
        
171
    def fit(self, *, timeout: float = None, iterations: int = None)-> CallResult[None]: # pragma: no cover
172
        if self._fitted:
173
            return CallResult(None)
174

175
        self._training_inputs, self._training_indices = self._get_columns_to_fit(self._inputs, self.hyperparams)
176
        self._input_column_names = self._training_inputs.columns
177

178
        if self._training_inputs is None:
179
            return CallResult(None)
180

181
        if len(self._training_indices) > 0:
182
            self._clf.fit(self._training_inputs)
183
            self._fitted = True
184
        else:
185
            if self.hyperparams['error_on_no_input']:
186
                raise RuntimeError("No input columns were selected")
187
            self.logger.warn("No input columns were selected")
188
        return CallResult(None)
189
        
190
    def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: # pragma: no cover
191

192
        outputs = inputs
193
        outputs.columns = ['timestamp','value','system_id','scores']
194
#        print(outputs)
195
#        print('max_score')
196
#        ensemble_max = outputs.groupby('system_id')[outputs.columns[-1]].max()
197

198
#        print(ensemble_max)
199
#
200
#        print('min_score')
201
#        ensemble_min = outputs.groupby('system_id')[outputs.columns[-1]].min()
202
#        print(ensemble_min)
203
#   
204
#        print('mean_score')     
205
#        outputs_mean = outputs.groupby('system_id')[outputs.columns[3]].mean()
206
#        print(outputs_mean) 
207

208
        outputs['results'] = numpy.where(outputs['scores']>0.05, 1, 0)
209
        print(outputs)
210

211
        outputs_xy = outputs.groupby('system_id')['results'].sum().reset_index()
212
        print("*****majority_sum_xy*****")
213
        print(outputs_xy)
214

215
        outputs_sum_x = outputs.groupby(['timestamp','system_id'])['results'].sum()
216
#        outputs_sum_x = outputs.groupby(['system_id','timestamp']).size().reset_index().groupby(['timestamp'])['results'].sum() 
217

218
        outputs_sum_y = outputs.groupby(['system_id','value'])['results'].sum()
219

220
        print('*****majority_max_x*****')
221
        print(outputs_sum_x)
222
        print('*****majority_max_y*****')
223
        print(outputs_sum_y)
224

225
        return base.CallResult(outputs) 
226

227
    def _update_metadata(self, outputs): # pragma: no cover
228
        outputs.metadata = outputs.metadata.generate(outputs,) 
229

230
    def get_params(self) -> Params: # pragma: no cover
231
        if not self._fitted:
232
            return Params(
233
                input_column_names=self._input_column_names,
234
                training_indices_=self._training_indices,
235
                target_names_=self._target_names,
236
                target_column_indices_=self._target_column_indices,
237
                target_columns_metadata_=self._target_columns_metadata
238
            )
239

240
        return Params(
241
            input_column_names=self._input_column_names,
242
            training_indices_=self._training_indices,
243
            target_names_=self._target_names,
244
            target_column_indices_=self._target_column_indices,
245
            target_columns_metadata_=self._target_columns_metadata
246
        )
247

248
    def set_params(self, *, params: Params) -> None: # pragma: no cover
249
        self._input_column_names = params['input_column_names']
250
        self._training_indices = params['training_indices_']
251
        self._target_names = params['target_names_']
252
        self._target_column_indices = params['target_column_indices_']
253
        self._target_columns_metadata = params['target_columns_metadata_']
254
        self._fitted = True
255

256

257

258
    
259
    
260
    @classmethod
2✔
261
    def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams): # pragma: no cover
262
        if not hyperparams['use_semantic_types']:
263
            return inputs, list(range(len(inputs.columns)))
264

265
        inputs_metadata = inputs.metadata
266

267
        def can_produce_column(column_index: int) -> bool:
268
            return cls._can_produce_column(inputs_metadata, column_index, hyperparams)
269

270
        columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
271
                                                                             use_columns=hyperparams['use_columns'],
272
                                                                             exclude_columns=hyperparams['exclude_columns'],
273
                                                                             can_use_column=can_produce_column)
274
        return inputs.iloc[:, columns_to_produce], columns_to_produce
275
        # return columns_to_produce
276

277
    @classmethod
2✔
278
    def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, hyperparams: Hyperparams) -> bool: # pragma: no cover
279
        column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))
280

281
        accepted_structural_types = (int, float, numpy.integer, numpy.float64)
282
        accepted_semantic_types = set()
283
        accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")
284
        if not issubclass(column_metadata['structural_type'], accepted_structural_types):
285
            return False
286

287
        semantic_types = set(column_metadata.get('semantic_types', []))
288

289
        if len(semantic_types) == 0:
290
            cls.logger.warning("No semantic types found in column metadata")
291
            return False
292
        
293
        # Making sure all accepted_semantic_types are available in semantic_types
294
        if len(accepted_semantic_types - semantic_types) == 0:
295
            return True
296

297
        return False
298
    
299

300
    @classmethod
2✔
301
    def _get_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams) -> List[OrderedDict]: # pragma: no cover
302
        outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
303

304
        target_columns_metadata: List[OrderedDict] = []
305
        for column_index in range(outputs_length):
306
            column_metadata = OrderedDict(outputs_metadata.query_column(column_index))
307

308
            # Update semantic types and prepare it for predicted targets.
309
            semantic_types = set(column_metadata.get('semantic_types', []))
310
            semantic_types_to_remove = set([])
311
            add_semantic_types = []
312
            add_semantic_types.add(hyperparams["return_semantic_type"])
313
            semantic_types = semantic_types - semantic_types_to_remove
314
            semantic_types = semantic_types.union(add_semantic_types)
315
            column_metadata['semantic_types'] = list(semantic_types)
316

317
            target_columns_metadata.append(column_metadata)
318

319
        return target_columns_metadata
320
    
321
    @classmethod
2✔
322
    def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs], # pragma: no cover
323
                                     target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:
324
        outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)
×
325

326
        for column_index, column_metadata in enumerate(target_columns_metadata):
×
327
            column_metadata.pop("structural_type", None)
×
328
            outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)
×
329

330
        return outputs_metadata
×
331

332
    def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs: # pragma: no cover
333
        outputs = d3m_dataframe(predictions, generate_metadata=True)
334
        target_columns_metadata = self._copy_inputs_metadata(inputs.metadata, self._training_indices, outputs.metadata, self.hyperparams)
335
        outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
336
        return outputs
337

338

339
    @classmethod
2✔
340
    def _copy_inputs_metadata(cls, inputs_metadata: metadata_base.DataMetadata, input_indices: List[int], # pragma: no cover
341
                                        outputs_metadata: metadata_base.DataMetadata, hyperparams):
342
        outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
×
343
        target_columns_metadata: List[OrderedDict] = []
×
344
        for column_index in input_indices:
×
345
            column_name = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)).get("name")
×
346
            if column_name is None:
×
347
                column_name = "output_{}".format(column_index)
×
348

349
            column_metadata = OrderedDict(inputs_metadata.query_column(column_index))
×
350
            semantic_types = set(column_metadata.get('semantic_types', []))
×
351
            semantic_types_to_remove = set([])
×
352
            add_semantic_types = set()
×
353
            add_semantic_types.add(hyperparams["return_semantic_type"])
×
354
            semantic_types = semantic_types - semantic_types_to_remove
×
355
            semantic_types = semantic_types.union(add_semantic_types)
×
356
            column_metadata['semantic_types'] = list(semantic_types)
×
357

358
            column_metadata["name"] = str(column_name)
×
359
            target_columns_metadata.append(column_metadata)
×
360

361
        #  If outputs has more columns than index, add Attribute Type to all remaining
362
        if outputs_length > len(input_indices):
×
363
            for column_index in range(len(input_indices), outputs_length):
×
364
                column_metadata = OrderedDict()
×
365
                semantic_types = set()
×
366
                semantic_types.add(hyperparams["return_semantic_type"])
×
367
                column_name = "output_{}".format(column_index)
×
368
                column_metadata["semantic_types"] = list(semantic_types)
×
369
                column_metadata["name"] = str(column_name)
×
370
                target_columns_metadata.append(column_metadata)
×
371

372
        return target_columns_metadata
×
373

374

375
# EnsemblePrimitive.__doc__ = Normalizer.__doc__
376

377

378

379

380

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc