How to use list_activities method in localstack

Best Python code snippet using localstack_python

log_disconnected.py

Source:log_disconnected.py Github

copy

Full Screen

1'''2 This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).3 PM4Py is free software: you can redistribute it and/or modify4 it under the terms of the GNU General Public License as published by5 the Free Software Foundation, either version 3 of the License, or6 (at your option) any later version.7 PM4Py is distributed in the hope that it will be useful,8 but WITHOUT ANY WARRANTY; without even the implied warranty of9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10 GNU General Public License for more details.11 You should have received a copy of the GNU General Public License12 along with PM4Py. If not, see <https://www.gnu.org/licenses/>.13'''14from enum import Enum15from pm4py.objects.log.util import sorting16from pm4py.util import constants, exec_utils17from pm4py.util import points_subset18from pm4py.util import xes_constants as xes19from pm4py.objects.log.util import basic_filter20from typing import Optional, Dict, Any, Union, Tuple, List21from pm4py.objects.log.obj import EventLog, EventStream22class Parameters(Enum):23 ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY24 TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY25 CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY26 ATTRIBUTE_KEY = constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY27 PARAMETER_SAMPLE_SIZE = "sample_size"28 SORT_LOG_REQUIRED = "sort_log_required"29def apply(log: EventLog, list_activities: List[str], sample_size: int, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[str, Any]:30 """31 Finds the disconnected performance spectrum provided a log32 and a list of activities33 Parameters34 -------------35 log36 Log37 list_activities38 List of activities interesting for the performance spectrum (at least two)39 sample_size40 Size of the sample41 parameters42 Parameters of the algorithm, including:43 - Parameters.ACTIVITY_KEY44 - Parameters.TIMESTAMP_KEY45 Returns46 -------------47 points48 Points of the performance spectrum49 """50 if parameters is None:51 parameters = {}52 sort_log_required = exec_utils.get_param_value(Parameters.SORT_LOG_REQUIRED, parameters, True)53 all_acti_combs = set(tuple(list_activities[j:j + i]) for i in range(2, len(list_activities) + 1) for j in54 range(0, len(list_activities) - i + 1))55 two_acti_combs = set((list_activities[i], list_activities[i + 1]) for i in range(len(list_activities) - 1))56 activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY)57 timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY)58 case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, xes.DEFAULT_TRACEID_KEY)59 parameters[Parameters.ATTRIBUTE_KEY] = activity_key60 log = basic_filter.filter_log_events_attr(log, list_activities, parameters=parameters)61 if sort_log_required:62 log = sorting.sort_timestamp_log(log, timestamp_key=timestamp_key)63 points = []64 for trace in log:65 matches = [(i, i + 1) for i in range(len(trace) - 1) if66 (trace[i][activity_key], trace[i + 1][activity_key]) in two_acti_combs]67 i = 068 while i < len(matches) - 1:69 matchAct = (trace[mi][activity_key] for mi in (matches[i] + matches[i + 1][1:]))70 if matches[i][-1] == matches[i + 1][0] and matchAct in all_acti_combs:71 matches[i] = matches[i] + matches[i + 1][1:]72 del matches[i + 1]73 i = 074 else:75 i += 176 if matches:77 matches = set(matches)78 timest_comb = [{'points': [(trace[i][activity_key], trace[i][timestamp_key].timestamp()) for i in match]}79 for match in matches]80 for p in timest_comb:81 p['case_id'] = trace.attributes[case_id_key]82 points += timest_comb83 points = sorted(points, key=lambda x: min(x['points'], key=lambda x: x[1])[1])84 if len(points) > sample_size:85 points = points_subset.pick_chosen_points_list(sample_size, points)...

Full Screen

Full Screen

algorithm.py

Source:algorithm.py Github

copy

Full Screen

1'''2 This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).3 PM4Py is free software: you can redistribute it and/or modify4 it under the terms of the GNU General Public License as published by5 the Free Software Foundation, either version 3 of the License, or6 (at your option) any later version.7 PM4Py is distributed in the hope that it will be useful,8 but WITHOUT ANY WARRANTY; without even the implied warranty of9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10 GNU General Public License for more details.11 You should have received a copy of the GNU General Public License12 along with PM4Py. If not, see <https://www.gnu.org/licenses/>.13'''14from pm4py.algo.discovery.performance_spectrum.variants import dataframe, log, dataframe_disconnected, log_disconnected15from pm4py.util import exec_utils16import pkgutil17from enum import Enum18from pm4py.util import constants19from typing import Optional, Dict, Any, Union, Tuple, List20from pm4py.objects.log.obj import EventLog, EventStream21import pandas as pd22class Parameters(Enum):23 ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY24 TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY25 CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY26 ATTRIBUTE_KEY = constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY27 PARAMETER_SAMPLE_SIZE = "sample_size"28class Outputs(Enum):29 LIST_ACTIVITIES = "list_activities"30 POINTS = "points"31class Variants(Enum):32 DATAFRAME = dataframe33 LOG = log34 DATAFRAME_DISCONNECTED = dataframe_disconnected35 LOG_DISCONNECTED = log_disconnected36def apply(log: Union[EventLog, EventStream, pd.DataFrame], list_activities: List[str], variant=None, parameters: Optional[Dict[Any, Any]] = None) -> Dict[str, Any]:37 """38 Finds the performance spectrum provided a log/dataframe39 and a list of activities40 Parameters41 -------------42 log43 Event log/Dataframe44 list_activities45 List of activities interesting for the performance spectrum (at least two)46 variant47 Variant to be used (see Variants Enum)48 parameters49 Parameters of the algorithm, including:50 - Parameters.ACTIVITY_KEY51 - Parameters.TIMESTAMP_KEY52 Returns53 -------------54 ps55 Performance spectrum object (dictionary)56 """57 from pm4py.objects.conversion.log import converter as log_conversion58 if parameters is None:59 parameters = {}60 sample_size = exec_utils.get_param_value(Parameters.PARAMETER_SAMPLE_SIZE, parameters, 10000)61 if len(list_activities) < 2:62 raise Exception("performance spectrum can be applied providing at least two activities!")63 points = None64 if pkgutil.find_loader("pandas"):65 import pandas as pd66 if type(log) is pd.DataFrame:67 if variant is None:68 variant = Variants.DATAFRAME69 points = exec_utils.get_variant(variant).apply(log, list_activities, sample_size, parameters)70 if points is None:71 if variant is None:72 variant = Variants.LOG73 points = exec_utils.get_variant(variant).apply(log_conversion.apply(log), list_activities, sample_size,74 parameters)75 ps = {Outputs.LIST_ACTIVITIES.value: list_activities, Outputs.POINTS.value: points}...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful