Best Python code snippet using playwright-python
StockAnalysisTool_UI.py
Source:StockAnalysisTool_UI.py  
1from os import sep2from enum import IntEnum, Enum3from datetime import datetime, timedelta4from pandas import DataFrame5from typing import Union6import dateutil.relativedelta7from PyQt5.QtCore import Qt, QMetaObject, QCoreApplication8from PyQt5.QtGui import QFont, QColor, QFontMetrics9from PyQt5.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QFrame, QComboBox, QCheckBox, \10                            QPushButton, QSizePolicy, QStatusBar, QApplication, QGridLayout, QTextEdit11from qdarkstyle import load_stylesheet12import pyqtgraph as pg13from utilities.FeatureEngineering import chande_momentum_oscillator, WilliamsR, CMO, MACD, ROC, WMA, HMA, TRIX, \14                                         CCI, DPO, CMF, ADX, ForceIndex15from MarketInterfaces.MarketDataInterface import retrieve_yahoo_fin_stock_data16from ta.momentum import rsi, wr, roc17from ta.trend import macd_signal, macd, cci, dpo, adx18from ta.volume import chaikin_money_flow19class Ui_StockAnalysisTool(QMainWindow):20    def __init__(self, relative_path_correction: str = ""):21        super(Ui_StockAnalysisTool, self).__init__()22        # TODO23        self.valid_stock_tickers = ["AMZN"]24        # Initialize Object Attributes25        self.source = list(Ui_StockAnalysisTool.StockSources)[0]26        self.sample_rates = [sample_rate.value for sample_rate in Ui_StockAnalysisTool.SampleRates]27        self.time_deltas = [time_delta.value for time_delta in Ui_StockAnalysisTool.TimeDeltas]28        self.stock = self.valid_stock_tickers[0]29        self.df = retrieve_yahoo_fin_stock_data(ticker=self.stock)30        self.first_date = self.df.index[0]31        self.last_date = self.df.index[-1]32        self.data_lines = {}33        self.indicators = {}34        self.colors = {35            'blue': QColor(0, 0, 255, 255),36            'cyan': QColor(0, 255, 255, 255),37            'magenta': QColor(255, 0, 255, 255),38            'yellow': QColor(255, 255, 0, 255),39            'white': QColor(255, 255, 255, 255),40            'dark-gray': QColor(125, 125, 125, 255),41            'light-gray': QColor(200, 200, 200, 255),42            'gray': QColor(100, 100, 150, 255),43            'orange': QColor(255, 165, 0, 255),44            'salmon': QColor(250, 128, 144, 255),45            'violet': QColor(230, 130, 238, 255),46            'aqua-marine': QColor(127, 255, 212, 255)47        }48        self.color_iterator = 049        self.protected_colors = {50            'green': QColor(0, 255, 0, 255),51            'red': QColor(255, 0, 0, 255)52        }53        self.robinhood_account_name = ""54        self.robinhood_account_password = ""55        self.rsi_n = 1456        self.cmo_n = 757        self.macd_slow = 2658        self.macd_fast = 1259        self.macd_sign = 960        self.roc_n = 1261        self.cci_n = 2062        self.dpo_n = 2063        self.cmf_n = 2064        self.adx_n = 1465        # Initialize UI Attributes. Then Initialize UI66        self.central_widget = None67        self.central_layout = None68        self.title_label = None69        self.title_h_divider = None70        self.display_layout = None71        self.graph_layout = None72        self.graph = None73        self.date_axis = None74        self.graph_legend = None75        self.data_selection_layout = None76        self.open_data_cb = None77        self.high_data_cb = None78        self.low_data_cb = None79        self.close_data_cb = None80        self.adjclose_data_cb = None81        self.volume_data_cb = None82        self.save_fig_btn = None83        self.graph_options_layout = None84        self.sample_rate_label = None85        self.sample_rate_combo = None86        self.time_delta_label = None87        self.time_delta_combo = None88        self.main_v_divider = None89        self.options_layout = None90        self.source_selection_layout = None91        self.source_label = None92        self.source_combo = None93        self.stock_selection_layout = None94        self.stock_label = None95        self.stock_combo = None96        self.robinhood_account_name_textedit = None97        self.robinhood_password_textedit = None98        self.robinhood_login_button = None99        self.top_h_divider = None100        self.second_from_top_h_divider = None101        self.momentum_indicator_label = None102        self.momentum_indicator_layout = None103        self.rsi_cb = None104        self.rsi_time_frame_label = None105        self.rsi_time_frame_text = None106        self.williams_r_cb = None107        self.cmo_cb = None108        self.macd_cb = None109        self.roc_cb = None110        self.middle_h_divider = None111        self.averages_label = None112        self.averages_layout = None113        self.wma_cb = None114        self.ema_cb = None115        self.sma_cb = None116        self.hma_cb = None117        self.trix_cb = None118        self.bottom_h_divider = None119        self.trend_indicators_label = None120        self.trend_indicators_layout = None121        self.cci_cb = None122        self.dpo_cb = None123        self.cmf_cb = None124        self.adx_cb = None125        self.force_index_cb = None126        self.statusbar = None127        self.setupUi()128    def setupUi(self) -> None:129        """130        :return:131        """132        # Generated Setup Code133        self.setObjectName("StockAnalysisTool_Ui")134        self.central_widget = QWidget(self)135        self.central_widget.setObjectName("central_widget")136        self.central_layout = QVBoxLayout(self.central_widget)137        self.central_layout.setObjectName("central_layout")138        self.title_label = QLabel(self.central_widget)139        sizePolicy = QSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)140        sizePolicy.setHorizontalStretch(0)141        sizePolicy.setVerticalStretch(0)142        sizePolicy.setHeightForWidth(self.title_label.sizePolicy().hasHeightForWidth())143        self.title_label.setSizePolicy(sizePolicy)144        font = QFont()145        font.setFamily("Garamond")146        font.setPointSize(22)147        font.setBold(True)148        font.setWeight(75)149        self.title_label.setFont(font)150        self.title_label.setAlignment(Qt.AlignCenter)151        self.title_label.setObjectName("title_label")152        self.central_layout.addWidget(self.title_label)153        self.title_h_divider = QFrame(self.central_widget)154        self.title_h_divider.setFrameShape(QFrame.HLine)155        self.title_h_divider.setFrameShadow(QFrame.Sunken)156        self.title_h_divider.setObjectName("title_h_divider")157        self.central_layout.addWidget(self.title_h_divider)158        self.display_layout = QHBoxLayout()159        self.display_layout.setObjectName("display_layout")160        self.graph_layout = QVBoxLayout()161        self.graph_layout.setObjectName("graph_layout")162        self.date_axis = pg.DateAxisItem(orientation='bottom')163        self.graph = pg.PlotWidget(axisItems={'bottom': self.date_axis})164        self.graph_legend = self.graph.addLegend()165        self.graph_layout.addWidget(self.graph)166        self.data_selection_layout = QGridLayout()167        self.open_data_cb = QCheckBox(self.central_widget)168        self.open_data_cb.setObjectName("open")169        self.data_selection_layout.addWidget(self.open_data_cb, 0, 0, 1, 1)170        self.high_data_cb = QCheckBox(self.central_widget)171        self.high_data_cb.setObjectName("high")172        self.data_selection_layout.addWidget(self.high_data_cb, 0, 1, 1, 1)173        self.low_data_cb = QCheckBox(self.central_widget)174        self.low_data_cb.setObjectName("low")175        self.data_selection_layout.addWidget(self.low_data_cb, 0, 2, 1, 1)176        self.close_data_cb = QCheckBox(self.central_widget)177        self.close_data_cb.setObjectName("close")178        self.data_selection_layout.addWidget(self.close_data_cb, 0, 3, 1, 1)179        self.adjclose_data_cb = QCheckBox(self.central_widget)180        self.adjclose_data_cb.setObjectName("adjclose")181        self.data_selection_layout.addWidget(self.adjclose_data_cb, 0, 4, 1, 1)182        self.volume_data_cb = QCheckBox(self.central_widget)183        self.volume_data_cb.setObjectName("volume")184        self.data_selection_layout.addWidget(self.volume_data_cb, 0, 5, 1, 1)185        self.graph_layout.addLayout(self.data_selection_layout)186        self.save_fig_btn = QPushButton(self.central_widget)187        self.save_fig_btn.setObjectName("save_fig_btn")188        self.graph_layout.addWidget(self.save_fig_btn)189        self.graph_options_layout = QGridLayout()190        self.sample_rate_label = QLabel(self.central_widget)191        self.sample_rate_label.setText("Sample Rate:")192        self.graph_options_layout.addWidget(self.sample_rate_label, 0, 0, 1, 1)193        self.sample_rate_combo = QComboBox(self.central_widget)194        self.sample_rate_combo.addItems(self.sample_rates)195        self.graph_options_layout.addWidget(self.sample_rate_combo, 0, 1, 1, 1)196        self.time_delta_label = QLabel(self.central_widget)197        self.time_delta_label.setText("Time delta:")198        self.graph_options_layout.addWidget(self.time_delta_label, 0, 2, 1, 1)199        self.time_delta_combo = QComboBox(self.central_widget)200        self.time_delta_combo.addItems(self.time_deltas)201        self.graph_options_layout.addWidget(self.time_delta_combo, 0, 3, 1, 1)202        self.graph_layout.addLayout(self.graph_options_layout)203        self.display_layout.addLayout(self.graph_layout)204        self.main_v_divider = QFrame(self.central_widget)205        self.main_v_divider.setFrameShape(QFrame.VLine)206        self.main_v_divider.setFrameShadow(QFrame.Sunken)207        self.main_v_divider.setObjectName("main_v_divider")208        self.display_layout.addWidget(self.main_v_divider)209        self.options_layout = QVBoxLayout()210        self.options_layout.setObjectName("options_layout")211        robinhood_account_layout = QHBoxLayout()212        robinhood_account_name_label = QLabel(self.central_widget)213        robinhood_account_name_label.setText("Robinhood Account:")214        self.robinhood_account_name_textedit = QTextEdit()215        robinhood_account_layout.addWidget(robinhood_account_name_label)216        robinhood_account_layout.addWidget(self.robinhood_account_name_textedit)217        self.options_layout.addLayout(robinhood_account_layout)218        robinhood_password_layout = QHBoxLayout()219        robinhood_account_password_label = QLabel(self.central_widget)220        robinhood_account_password_label.setText("Robinhood Password:")221        self.robinhood_password_textedit = QTextEdit()222        robinhood_password_layout.addWidget(robinhood_account_password_label)223        robinhood_password_layout.addWidget(self.robinhood_password_textedit)224        self.options_layout.addLayout(robinhood_password_layout)225        self.robinhood_login_button = QPushButton()226        self.robinhood_login_button.setObjectName("robinhood_login_button")227        self.options_layout.addWidget(self.robinhood_login_button)228        self.top_h_divider = QFrame(self.central_widget)229        self.top_h_divider.setFrameShape(QFrame.HLine)230        self.top_h_divider.setFrameShadow(QFrame.Sunken)231        self.top_h_divider.setObjectName("top_h_divider")232        self.options_layout.addWidget(self.top_h_divider)233        self.source_selection_layout = QHBoxLayout()234        self.source_selection_layout.setObjectName("source_selection_layout")235        self.source_label = QLabel(self.central_widget)236        self.source_label.setObjectName("source_label")237        self.source_selection_layout.addWidget(self.source_label)238        self.source_combo = QComboBox(self.central_widget)239        self.source_combo.setObjectName("source_combo")240        self.source_combo.addItem("")241        self.source_combo.addItem("")242        self.source_combo.addItem("")243        self.source_selection_layout.addWidget(self.source_combo)244        self.options_layout.addLayout(self.source_selection_layout)245        self.stock_selection_layout = QHBoxLayout()246        self.stock_selection_layout.setObjectName("stock_selection_layout")247        self.stock_label = QLabel(self.central_widget)248        self.stock_label.setObjectName("stock_label")249        self.stock_selection_layout.addWidget(self.stock_label)250        self.stock_combo = QComboBox(self.central_widget)251        self.stock_combo.setObjectName("stock_combo")252        self.stock_combo.addItems(self.valid_stock_tickers)253        self.stock_selection_layout.addWidget(self.stock_combo)254        self.options_layout.addLayout(self.stock_selection_layout)255        self.second_from_top_h_divider = QFrame(self.central_widget)256        self.second_from_top_h_divider.setFrameShape(QFrame.HLine)257        self.second_from_top_h_divider.setFrameShadow(QFrame.Sunken)258        self.second_from_top_h_divider.setObjectName("second_from_top_h_divider")259        self.options_layout.addWidget(self.second_from_top_h_divider)260        self.momentum_indicator_label = QLabel(self.central_widget)261        self.momentum_indicator_label.setObjectName("momentum_indicator_label")262        self.options_layout.addWidget(self.momentum_indicator_label)263        # Momentum Indicators264        self.momentum_indicator_layout = QGridLayout()265        self.rsi_cb = QCheckBox(self.central_widget)266        self.rsi_cb.setObjectName("rsi_cb")267        self.momentum_indicator_layout.addWidget(self.rsi_cb, 0, 0, 1, 1)268        self.rsi_time_frame_label = QLabel(self.central_widget)269        self.rsi_time_frame_label.setText("Time frame:")270        self.momentum_indicator_layout.addWidget(self.rsi_time_frame_label, 0, 1, 1, 1)271        self.rsi_time_frame_text = QTextEdit(self.central_widget)272        font_metric = QFontMetrics(self.rsi_time_frame_text.font())273        self.rsi_time_frame_text.setFixedHeight(font_metric.lineSpacing())274        self.rsi_time_frame_text.setFixedWidth(50)275        self.rsi_time_frame_text.setText(str(self.rsi_n))276        self.momentum_indicator_layout.addWidget(self.rsi_time_frame_text, 0, 2, 1, 1)277        self.williams_r_cb = QCheckBox(self.central_widget)278        self.williams_r_cb.setObjectName("williams_r_cb")279        self.momentum_indicator_layout.addWidget(self.williams_r_cb, 1, 0, 1, 1)280        self.cmo_cb = QCheckBox(self.central_widget)281        self.cmo_cb.setObjectName("cmo_cb")282        self.momentum_indicator_layout.addWidget(self.cmo_cb, 2, 0, 1, 1)283        self.macd_cb = QCheckBox(self.central_widget)284        self.macd_cb.setObjectName("macd_cb")285        self.momentum_indicator_layout.addWidget(self.macd_cb, 3, 0, 1, 1)286        self.roc_cb = QCheckBox(self.central_widget)287        self.roc_cb.setObjectName("roc_cb")288        self.momentum_indicator_layout.addWidget(self.roc_cb, 4, 0, 1, 1)289        self.middle_h_divider = QFrame(self.central_widget)290        self.middle_h_divider.setFrameShape(QFrame.HLine)291        self.middle_h_divider.setFrameShadow(QFrame.Sunken)292        self.middle_h_divider.setObjectName("middle_h_divider")293        self.options_layout.addLayout(self.momentum_indicator_layout)294        self.options_layout.addWidget(self.middle_h_divider)295        # Averages Indicators296        self.averages_label = QLabel(self.central_widget)297        self.averages_label.setObjectName("averages_label")298        self.options_layout.addWidget(self.averages_label)299        self.wma_cb = QCheckBox(self.central_widget)300        self.wma_cb.setObjectName("wma_cb")301        self.options_layout.addWidget(self.wma_cb)302        self.ema_cb = QCheckBox(self.central_widget)303        self.ema_cb.setObjectName("ema_cb")304        self.options_layout.addWidget(self.ema_cb)305        self.sma_cb = QCheckBox(self.central_widget)306        self.sma_cb.setObjectName("sma_cb")307        self.options_layout.addWidget(self.sma_cb)308        self.hma_cb = QCheckBox(self.central_widget)309        self.hma_cb.setObjectName("hma_cb")310        self.options_layout.addWidget(self.hma_cb)311        self.trix_cb = QCheckBox(self.central_widget)312        self.trix_cb.setObjectName("trix_cb")313        self.options_layout.addWidget(self.trix_cb)314        self.bottom_h_divider = QFrame(self.central_widget)315        self.bottom_h_divider.setFrameShape(QFrame.HLine)316        self.bottom_h_divider.setFrameShadow(QFrame.Sunken)317        self.bottom_h_divider.setObjectName("bottom_h_divider")318        self.options_layout.addWidget(self.bottom_h_divider)319        # Trend Indicators320        self.trend_indicators_label = QLabel(self.central_widget)321        self.trend_indicators_label.setObjectName("trend_indicators_label")322        self.options_layout.addWidget(self.trend_indicators_label)323        self.cci_cb = QCheckBox(self.central_widget)324        self.cci_cb.setObjectName("cci_cb")325        self.options_layout.addWidget(self.cci_cb)326        self.dpo_cb = QCheckBox(self.central_widget)327        self.dpo_cb.setObjectName("dpo_cb")328        self.options_layout.addWidget(self.dpo_cb)329        self.cmf_cb = QCheckBox(self.central_widget)330        self.cmf_cb.setObjectName("cmf_cb")331        self.options_layout.addWidget(self.cmf_cb)332        self.adx_cb = QCheckBox(self.central_widget)333        self.adx_cb.setObjectName("adx_cb")334        self.options_layout.addWidget(self.adx_cb)335        self.force_index_cb = QCheckBox(self.central_widget)336        self.force_index_cb.setObjectName("checkBox_14")337        self.options_layout.addWidget(self.force_index_cb)338        self.display_layout.addLayout(self.options_layout)339        self.central_layout.addLayout(self.display_layout)340        self.setCentralWidget(self.central_widget)341        self.statusbar = QStatusBar(self)342        self.statusbar.setObjectName("statusbar")343        self.setStatusBar(self.statusbar)344        self.retranslateUi()345        self.setCallbacks()346        QMetaObject.connectSlotsByName(self)347    def setCallbacks(self) -> None:348        """349        :return:350        """351        self.sample_rate_combo.currentIndexChanged.connect(352            lambda: Ui_StockAnalysisTool.Callbacks.sample_rate_combo_changed(combo=self.sample_rate_combo)353        )354        self.time_delta_combo.currentIndexChanged.connect(355            lambda: Ui_StockAnalysisTool.Callbacks.time_delta_combo_changed(stock_analysis_tool=self,356                                                                            combo=self.time_delta_combo)357        )358        self.source_combo.currentIndexChanged.connect(359            lambda: Ui_StockAnalysisTool.Callbacks.source_combo_changed(combo=self.source_combo)360        )361        self.stock_combo.currentIndexChanged.connect(362            lambda: Ui_StockAnalysisTool.Callbacks.stock_combo_changed(stock_analysis_tool=self,363                                                                       combo=self.stock_combo,364                                                                       source=self.source))365        self.open_data_cb.clicked.connect(366            lambda: Ui_StockAnalysisTool.Callbacks.open_data_cb_pressed(stock_analysis_tool=self,367                                                                        cb=self.open_data_cb)368        )369        self.high_data_cb.clicked.connect(370            lambda: Ui_StockAnalysisTool.Callbacks.high_data_cb_pressed(stock_analysis_tool=self,371                                                                        cb=self.high_data_cb)372        )373        self.low_data_cb.clicked.connect(374            lambda: Ui_StockAnalysisTool.Callbacks.low_data_cb_pressed(stock_analysis_tool=self,375                                                                       cb=self.low_data_cb)376        )377        self.close_data_cb.clicked.connect(378            lambda: Ui_StockAnalysisTool.Callbacks.close_data_cb_pressed(stock_analysis_tool=self,379                                                                         cb=self.close_data_cb)380        )381        self.adjclose_data_cb.clicked.connect(382            lambda: Ui_StockAnalysisTool.Callbacks.adjclose_data_cb_pressed(stock_analysis_tool=self,383                                                                            cb=self.adjclose_data_cb)384        )385        self.volume_data_cb.clicked.connect(386            lambda: Ui_StockAnalysisTool.Callbacks.volume_data_cb_pressed(stock_analysis_tool=self,387                                                                          cb=self.volume_data_cb)388        )389        self.robinhood_account_name_textedit.textChanged.connect(390            lambda: Ui_StockAnalysisTool.Callbacks.robinhood_account_name_text_changed(391                stock_analysis_tool=self,392                robinhood_account_name_textedit=self.robinhood_account_name_textedit393            )394        )395        self.robinhood_password_textedit.textChanged.connect(396            lambda: Ui_StockAnalysisTool.Callbacks.robinhood_password_text_changed(397                stock_analysis_tool=self,398                robinhood_password_textedit=self.robinhood_password_textedit399            )400        )401        self.robinhood_login_button.clicked.connect(402            lambda: Ui_StockAnalysisTool.Callbacks.robinhood_login(403                robinhood_login_name=self.robinhood_account_name,404                robinhood_password=self.robinhood_account_password405            )406        )407        self.rsi_cb.clicked.connect(408            lambda: Ui_StockAnalysisTool.Callbacks.rsi_cb_pressed(stock_analysis_tool=self,409                                                                  cb=self.rsi_cb)410        )411        self.rsi_time_frame_text.textChanged.connect(412            lambda: Ui_StockAnalysisTool.Callbacks.rsi_time_frame_text_changed(413                stock_analysis_tool=self,414                rsi_time_frame_text=self.rsi_time_frame_text415            )416        )417        self.williams_r_cb.clicked.connect(418            lambda: Ui_StockAnalysisTool.Callbacks.williams_r_cb_pressed(stock_analysis_tool=self,419                                                                         cb=self.williams_r_cb))420        self.cmo_cb.clicked.connect(421            lambda: Ui_StockAnalysisTool.Callbacks.cmo_cb_pressed(stock_analysis_tool=self,422                                                                  cb=self.cmo_cb))423        self.macd_cb.clicked.connect(424            lambda: Ui_StockAnalysisTool.Callbacks.macd_cb_pressed(stock_analysis_tool=self,425                                                                   cb=self.macd_cb))426        self.roc_cb.clicked.connect(427            lambda: Ui_StockAnalysisTool.Callbacks.roc_cb_pressed(stock_analysis_tool=self,428                                                                  cb=self.roc_cb))429        self.wma_cb.clicked.connect(430            lambda: Ui_StockAnalysisTool.Callbacks.wma_cb_pressed(cb=self.wma_cb))431        self.ema_cb.clicked.connect(432            lambda: Ui_StockAnalysisTool.Callbacks.ema_cb_pressed(cb=self.ema_cb))433        self.sma_cb.clicked.connect(434            lambda: Ui_StockAnalysisTool.Callbacks.sma_cb_pressed(cb=self.sma_cb))435        self.hma_cb.clicked.connect(436            lambda: Ui_StockAnalysisTool.Callbacks.hma_cb_pressed(cb=self.hma_cb))437        self.trix_cb.clicked.connect(438            lambda: Ui_StockAnalysisTool.Callbacks.trix_cb_pressed(cb=self.trix_cb))439        self.cci_cb.clicked.connect(440            lambda: Ui_StockAnalysisTool.Callbacks.cci_cb_pressed(stock_analysis_tool=self,441                                                                  cb=self.cci_cb))442        self.dpo_cb.clicked.connect(443            lambda: Ui_StockAnalysisTool.Callbacks.dpo_cb_pressed(stock_analysis_tool=self,444                                                                  cb=self.dpo_cb))445        self.cmf_cb.clicked.connect(446            lambda: Ui_StockAnalysisTool.Callbacks.cmf_cb_pressed(stock_analysis_tool=self,447                                                                  cb=self.cmf_cb))448        self.adx_cb.clicked.connect(449            lambda: Ui_StockAnalysisTool.Callbacks.adx_cb_pressed(stock_analysis_tool=self,450                                                                  cb=self.adx_cb))451        self.force_index_cb.clicked.connect(452            lambda: Ui_StockAnalysisTool.Callbacks.force_index_cb_pressed(cb=self.force_index_cb))453    def retranslateUi(self) -> None:454        """455        :return:456        """457        _translate = QCoreApplication.translate458        self.setWindowTitle(_translate("StockAnalysisTool_Ui", "Stock Analysis Tool"))459        self.title_label.setText(_translate("StockAnalysisTool_Ui", "Robbin Stock Analysis Tool"))460        self.save_fig_btn.setText(_translate("StockAnalysisTool_Ui", "Export Graph"))461        self.source_label.setText(_translate("StockAnalysisTool_Ui", "Source:"))462        self.source_combo.setItemText(0, _translate("StockAnalysisTool_Ui", "yahoo_fin"))463        self.source_combo.setItemText(1, _translate("StockAnalysisTool_Ui", "ASTRASS"))464        self.source_combo.setItemText(2, _translate("StockAnalysisTool_Ui", "yfinance"))465        self.stock_label.setText(_translate("StockAnalysisTool_Ui", "Stock:"))466        self.momentum_indicator_label.setText(_translate("StockAnalysisTool_Ui", "Momentum Indicators"))467        self.open_data_cb.setText(_translate("StockAnalysisTool_Ui", "open"))468        self.high_data_cb.setText(_translate("StockAnalysisTool_Ui", "high"))469        self.low_data_cb.setText(_translate("StockAnalysisTool_Ui", "low"))470        self.close_data_cb.setText(_translate("StockAnalysisTool_Ui", "close"))471        self.adjclose_data_cb.setText(_translate("StockAnalysisTool_Ui", "adjclose"))472        self.volume_data_cb.setText(_translate("StockAnalysisTool_Ui", "volume"))473        self.robinhood_login_button.setText(_translate("StockAnalysisTool_Ui", "Login"))474        self.rsi_cb.setText(_translate("StockAnalysisTool_Ui", "RSI"))475        self.williams_r_cb.setText(_translate("StockAnalysisTool_Ui", "WilliamsR"))476        self.cmo_cb.setText(_translate("StockAnalysisTool_Ui", "CMO"))477        self.macd_cb.setText(_translate("StockAnalysisTool_Ui", "MACD"))478        self.roc_cb.setText(_translate("StockAnalysisTool_Ui", "ROC"))479        self.averages_label.setText(_translate("StockAnalysisTool_Ui", "Averages"))480        self.wma_cb.setText(_translate("StockAnalysisTool_Ui", "WMA"))481        self.ema_cb.setText(_translate("StockAnalysisTool_Ui", "EMA"))482        self.sma_cb.setText(_translate("StockAnalysisTool_Ui", "SMA"))483        self.hma_cb.setText(_translate("StockAnalysisTool_Ui", "HMA"))484        self.trix_cb.setText(_translate("StockAnalysisTool_Ui", "TRIX"))485        self.trend_indicators_label.setText(_translate("StockAnalysisTool_Ui", "Trend Indicators"))486        self.cci_cb.setText(_translate("StockAnalysisTool_Ui", "CCI"))487        self.dpo_cb.setText(_translate("StockAnalysisTool_Ui", "DPO"))488        self.cmf_cb.setText(_translate("StockAnalysisTool_Ui", "CMF"))489        self.adx_cb.setText(_translate("StockAnalysisTool_Ui", "ADX"))490        self.force_index_cb.setText(_translate("StockAnalysisTool_Ui", "Force Index"))491    def add_column_to_graph(self, column_name: str,492                            color: Union[QColor, None] = None) -> None:493        """494        :param column_name:495        :param color:496        :return:497        """498        relevant_df = self.get_relevant_data_frame()499        x = [datetime.timestamp(date_time) for date_time in relevant_df.index]500        if color is None:501            self.data_lines[column_name] = self.graph.plot(x=x,502                                                           y=relevant_df[column_name],503                                                           pen=pg.mkPen(color=self.colors[list(self.colors.keys())[self.color_iterator]]),504                                                           name=column_name)505            self.color_iterator += 1506            self.color_iterator %= len(self.colors.keys())507        else:508            self.data_lines[column_name] = self.graph.plot(x=x,509                                                           y=relevant_df[column_name],510                                                           pen=pg.mkPen(color=color),511                                                           name=column_name)512    def remove_column_from_graph(self, column_name: str) -> None:513        """514        :param column_name:515        :return:516        """517        self.graph.removeItem(self.data_lines[column_name])518        del self.data_lines[column_name]519    def update_data_on_graph(self, data_frame: DataFrame) -> None:520        """521        :param data_frame:522        :return:523        """524        x = [datetime.timestamp(date_time) for date_time in data_frame.index]525        for data_line_key, data_line in self.data_lines.items():526            data_line.setData(x=x, y=data_frame[data_line_key])527    def get_relevant_data_frame(self) -> DataFrame:528        """529        :return:530        """531        return self.df.truncate(before=self.first_date, after=self.last_date)532    class Callbacks:533        @staticmethod534        def generic_data_cb_pressed(stock_analysis_tool: QMainWindow,535                                    checked: bool,536                                    df_column_name: str) -> None:537            if checked:538                # Add open data to graph539                stock_analysis_tool.add_column_to_graph(column_name=df_column_name)540            else:541                # Remove open data from graph542                stock_analysis_tool.remove_column_from_graph(column_name=df_column_name)543        @staticmethod544        def source_combo_changed(combo: QComboBox) -> None:545            combo_text = combo.currentText()546        @staticmethod547        def stock_combo_changed(stock_analysis_tool: QMainWindow,548                                combo: QComboBox,549                                source: IntEnum) -> None:550            combo_text = combo.currentText()551            if source == Ui_StockAnalysisTool.StockSources.YAHOO_FIN:552                stock_analysis_tool.df = retrieve_yahoo_fin_stock_data(ticker=combo_text)553            elif source == Ui_StockAnalysisTool.StockSources.ASTRASS:554                # retrieve data from ASTRASS555                stock_analysis_tool.df = None556            elif source == Ui_StockAnalysisTool.StockSources.YFINANCE:557                # retrieve data from YFinance558                stock_analysis_tool.df = None559            stock_analysis_tool.update_data_on_graph(data_frame=stock_analysis_tool.df)560        @staticmethod561        def sample_rate_combo_changed(combo: QComboBox) -> None:562            combo_text = combo.currentText()563        @staticmethod564        def time_delta_combo_changed(stock_analysis_tool: QMainWindow,565                                     combo: QComboBox) -> None:566            """567            :param stock_analysis_tool:568            :param combo:569            :return:570            """571            new_time_delta = Ui_StockAnalysisTool.TimeDeltas(combo.currentText())572            min_date = stock_analysis_tool.df.index[0].to_pydatetime()573            max_date = stock_analysis_tool.df.index[-1].to_pydatetime()574            if new_time_delta == Ui_StockAnalysisTool.TimeDeltas.FIVE_YEARS:575                min_date = max_date - timedelta(days=365*5)576            elif new_time_delta == Ui_StockAnalysisTool.TimeDeltas.ONE_YEAR:577                min_date = max_date - timedelta(days=365*1)578            elif new_time_delta == Ui_StockAnalysisTool.TimeDeltas.YEAR_TO_DATE:579                min_date = min_date.replace(year=max_date.year, month=1, day=1)580            elif new_time_delta == Ui_StockAnalysisTool.TimeDeltas.SIX_MONTHS:581                min_date = max_date - dateutil.relativedelta.relativedelta(months=6)582            elif new_time_delta == Ui_StockAnalysisTool.TimeDeltas.ONE_MONTH:583                min_date = max_date - dateutil.relativedelta.relativedelta(months=1)584            elif new_time_delta == Ui_StockAnalysisTool.TimeDeltas.FIVE_DAYS:585                min_date = max_date - timedelta(days=5)586            stock_analysis_tool.first_date = min_date587            stock_analysis_tool.last_date = max_date588            stock_analysis_tool.update_data_on_graph(stock_analysis_tool.get_relevant_data_frame())589        @staticmethod590        def open_data_cb_pressed(stock_analysis_tool: QMainWindow,591                                 cb: QCheckBox) -> None:592            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(593                stock_analysis_tool=stock_analysis_tool,594                checked=cb.isChecked(),595                df_column_name='open'596            )597        @staticmethod598        def high_data_cb_pressed(stock_analysis_tool: QMainWindow,599                                 cb: QCheckBox) -> None:600            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(601                stock_analysis_tool=stock_analysis_tool,602                checked=cb.isChecked(),603                df_column_name='high'604            )605        @staticmethod606        def low_data_cb_pressed(stock_analysis_tool: QMainWindow,607                                cb: QCheckBox) -> None:608            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(609                stock_analysis_tool=stock_analysis_tool,610                checked=cb.isChecked(),611                df_column_name='low'612            )613        @staticmethod614        def close_data_cb_pressed(stock_analysis_tool: QMainWindow,615                                  cb: QCheckBox) -> None:616            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(617                stock_analysis_tool=stock_analysis_tool,618                checked=cb.isChecked(),619                df_column_name='close'620            )621        @staticmethod622        def adjclose_data_cb_pressed(stock_analysis_tool: QMainWindow,623                                     cb: QCheckBox) -> None:624            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(625                stock_analysis_tool=stock_analysis_tool,626                checked=cb.isChecked(),627                df_column_name='adjclose'628            )629        @staticmethod630        def volume_data_cb_pressed(stock_analysis_tool: QMainWindow,631                                   cb: QCheckBox) -> None:632            Ui_StockAnalysisTool.Callbacks.generic_data_cb_pressed(633                stock_analysis_tool=stock_analysis_tool,634                checked=cb.isChecked(),635                df_column_name='volume'636            )637        @staticmethod638        def robinhood_account_name_text_changed(stock_analysis_tool: QMainWindow,639                                                robinhood_account_name_textedit: QTextEdit) -> None:640            text = robinhood_account_name_textedit.toPlainText()641            stock_analysis_tool.robinhood_account_name = text642        @staticmethod643        def robinhood_password_text_changed(stock_analysis_tool: QMainWindow,644                                            robinhood_password_textedit: QTextEdit) -> None:645            text = robinhood_password_textedit.toPlainText()646            stock_analysis_tool.robinhood_password = text647        @staticmethod648        def robinhood_login(robinhood_account_name: str,649                            robinhood_password: str) -> None:650            pass651        @staticmethod652        def rsi_cb_pressed(stock_analysis_tool: QMainWindow,653                           cb: QCheckBox) -> None:654            """655            :param stock_analysis_tool:656            :param cb:657            :return:658            """659            if cb.isChecked():660                # Add RSI to Display Graph661                stock_analysis_tool.df['rsi'] = rsi(close=stock_analysis_tool.df['close'],662                                                    n=stock_analysis_tool.rsi_n)663                stock_analysis_tool.df['rsi overbought'] = 70664                stock_analysis_tool.df['rsi oversold'] = 30665                stock_analysis_tool.add_column_to_graph(column_name='rsi')666                stock_analysis_tool.add_column_to_graph(column_name='rsi overbought',667                                                        color=stock_analysis_tool.protected_colors['red'])668                stock_analysis_tool.add_column_to_graph(column_name='rsi oversold',669                                                        color=stock_analysis_tool.protected_colors['green'])670            else:671                # Remove RSI from Display Graph672                stock_analysis_tool.remove_column_from_graph(column_name='rsi')673                stock_analysis_tool.remove_column_from_graph(column_name='rsi overbought')674                stock_analysis_tool.remove_column_from_graph(column_name='rsi oversold')675                stock_analysis_tool.df = stock_analysis_tool.df.drop("rsi", axis=1)676                stock_analysis_tool.df = stock_analysis_tool.df.drop("rsi overbought", axis=1)677                stock_analysis_tool.df = stock_analysis_tool.df.drop("rsi oversold", axis=1)678        @staticmethod679        def rsi_time_frame_text_changed(stock_analysis_tool: QMainWindow,680                                        rsi_time_frame_text: QTextEdit):681            """682            :param stock_analysis_tool:683            :param rsi_time_frame_text:684            :return:685            """686            text = rsi_time_frame_text.toPlainText()687            if text != "":688                try:689                    stock_analysis_tool.rsi_n = int(text)690                    if 'rsi' in stock_analysis_tool.df.columns:691                        stock_analysis_tool.df = stock_analysis_tool.df.drop("rsi", axis=1)692                        stock_analysis_tool.df['rsi'] = rsi(close=stock_analysis_tool.df['close'],693                                                            n=stock_analysis_tool.rsi_n)694                        x = [datetime.timestamp(date_time) for date_time in stock_analysis_tool.df.index]695                        stock_analysis_tool.data_lines['rsi'].setData(x, stock_analysis_tool.df['rsi'])696                except ValueError:697                    print("Invalid RSI Input")698        @staticmethod699        def williams_r_cb_pressed(stock_analysis_tool: QMainWindow,700                                  cb: QCheckBox) -> None:701            """702            :param cb:703            :return:704            """705            if cb.isChecked():706                # Add WilliamsR to Display Graph707                stock_analysis_tool.df['WilliamsR'] = wr(stock_analysis_tool.df['high'],708                                                         stock_analysis_tool.df['low'],709                                                         stock_analysis_tool.df['close'])710                stock_analysis_tool.df['WilliamsR overbought'] = -20711                stock_analysis_tool.df['WilliamsR oversold'] = -80712                stock_analysis_tool.add_column_to_graph(column_name='WilliamsR')713                stock_analysis_tool.add_column_to_graph(column_name='WilliamsR overbought',714                                                        color=stock_analysis_tool.protected_colors['red'])715                stock_analysis_tool.add_column_to_graph(column_name='WilliamsR oversold',716                                                        color=stock_analysis_tool.protected_colors['green'])717            else:718                # Remove WilliamsR from Display Graph719                stock_analysis_tool.remove_column_from_graph(column_name='WilliamsR')720                stock_analysis_tool.remove_column_from_graph(column_name='WilliamsR overbought')721                stock_analysis_tool.remove_column_from_graph(column_name='WilliamsR oversold')722                stock_analysis_tool.df = stock_analysis_tool.df.drop("WilliamsR", axis=1)723                stock_analysis_tool.df = stock_analysis_tool.df.drop("WilliamsR overbought", axis=1)724                stock_analysis_tool.df = stock_analysis_tool.df.drop("WilliamsR oversold", axis=1)725        @staticmethod726        def cmo_cb_pressed(stock_analysis_tool: QMainWindow,727                           cb: QCheckBox) -> None:728            """729            :param cb:730            :return:731            """732            if cb.isChecked():733                stock_analysis_tool.df['cmo'] = chande_momentum_oscillator(close_data=stock_analysis_tool.df['close'],734                                                                           period=stock_analysis_tool.cmo_n)735                stock_analysis_tool.df['cmo overbought'] = 50736                stock_analysis_tool.df['cmo oversold'] = -50737                # Add CMO to Display Graph738                stock_analysis_tool.add_column_to_graph(column_name='cmo')739                stock_analysis_tool.add_column_to_graph(column_name='cmo overbought',740                                                        color=stock_analysis_tool.protected_colors['red'])741                stock_analysis_tool.add_column_to_graph(column_name='cmo oversold',742                                                        color=stock_analysis_tool.protected_colors['green'])743            else:744                # Remove CMO from Display Graph745                stock_analysis_tool.remove_column_from_graph(column_name='cmo')746                stock_analysis_tool.remove_column_from_graph(column_name='cmo overbought')747                stock_analysis_tool.remove_column_from_graph(column_name='cmo oversold')748                stock_analysis_tool.df = stock_analysis_tool.df.drop("cmo", axis=1)749                stock_analysis_tool.df = stock_analysis_tool.df.drop("cmo overbought", axis=1)750                stock_analysis_tool.df = stock_analysis_tool.df.drop("cmo oversold", axis=1)751        @staticmethod752        def macd_cb_pressed(stock_analysis_tool: QMainWindow,753                            cb: QCheckBox) -> None:754            """755            :param cb:756            :return:757            """758            if cb.isChecked():759                # Add MACD to Display Graph760                stock_analysis_tool.df['MACD'] = macd(close=stock_analysis_tool.df['close'],761                                                      n_slow=stock_analysis_tool.macd_slow,762                                                      n_fast=stock_analysis_tool.macd_fast) - \763                                                 macd_signal(close=stock_analysis_tool.df['close'],764                                                             n_slow=stock_analysis_tool.macd_slow,765                                                             n_fast=stock_analysis_tool.macd_fast,766                                                             n_sign=stock_analysis_tool.macd_sign)767                stock_analysis_tool.add_column_to_graph(column_name='MACD')768            else:769                # Remove MACD from Display Graph770                stock_analysis_tool.remove_column_from_graph(column_name='MACD')771                stock_analysis_tool.df = stock_analysis_tool.df.drop("MACD", axis=1)772        @staticmethod773        def roc_cb_pressed(stock_analysis_tool: QMainWindow,774                           cb: QCheckBox) -> None:775            """776            :param cb:777            :return:778            """779            if cb.isChecked():780                # Add ROC to Display Graph781                stock_analysis_tool.df['roc'] = roc(close=stock_analysis_tool.df['close'],782                                                    n=stock_analysis_tool.roc_n)783                stock_analysis_tool.add_column_to_graph(column_name='roc')784            else:785                # Remove ROC from Display Graph786                stock_analysis_tool.remove_column_from_graph(column_name='roc')787                stock_analysis_tool.df = stock_analysis_tool.df.drop("roc", axis=1)788        @staticmethod789        def wma_cb_pressed(cb: QCheckBox) -> None:790            """791            :param cb:792            :return:793            """794            if cb.isChecked():795                # Add RSI to Display Graph796                pass797            else:798                # Remove RSI from Display Graph799                pass800        @staticmethod801        def ema_cb_pressed(cb: QCheckBox) -> None:802            """803            :param cb:804            :return:805            """806            if cb.isChecked():807                # Add RSI to Display Graph808                pass809            else:810                # Remove RSI from Display Graph811                pass812        @staticmethod813        def sma_cb_pressed(cb: QCheckBox) -> None:814            """815            :param cb:816            :return:817            """818            if cb.isChecked():819                # Add RSI to Display Graph820                pass821            else:822                # Remove RSI from Display Graph823                pass824        @staticmethod825        def hma_cb_pressed(cb: QCheckBox) -> None:826            """827            :param cb:828            :return:829            """830            if cb.isChecked():831                # Add RSI to Display Graph832                pass833            else:834                # Remove RSI from Display Graph835                pass836        @staticmethod837        def trix_cb_pressed(cb: QCheckBox) -> None:838            """839            :param cb:840            :return:841            """842            if cb.isChecked():843                # Add RSI to Display Graph844                pass845            else:846                # Remove RSI from Display Graph847                pass848        @staticmethod849        def cci_cb_pressed(stock_analysis_tool: QMainWindow,850                           cb: QCheckBox) -> None:851            """852            :param cb:853            :return:854            """855            if cb.isChecked():856                # Add CCI to Display Graph857                stock_analysis_tool.df['cci'] = cci(close=stock_analysis_tool.df['close'],858                                                    low=stock_analysis_tool.df['low'],859                                                    high=stock_analysis_tool.df['high'],860                                                    n=stock_analysis_tool.cci_n)861                stock_analysis_tool.add_column_to_graph(column_name='cci')862            else:863                # Remove CCI from Display Graph864                stock_analysis_tool.remove_column_from_graph(column_name='cci')865                stock_analysis_tool.df = stock_analysis_tool.df.drop("cci", axis=1)866        @staticmethod867        def dpo_cb_pressed(stock_analysis_tool: QMainWindow,868                           cb: QCheckBox) -> None:869            """870            :param cb:871            :return:872            """873            if cb.isChecked():874                # Add DPO to Display Graph875                stock_analysis_tool.df['dpo'] = dpo(close=stock_analysis_tool.df['close'],876                                                    n=stock_analysis_tool.dpo_n)877                stock_analysis_tool.add_column_to_graph(column_name='dpo')878            else:879                # Remove DPO from Display Graph880                stock_analysis_tool.remove_column_from_graph(column_name='dpo')881                stock_analysis_tool.df = stock_analysis_tool.df.drop("dpo", axis=1)882        @staticmethod883        def cmf_cb_pressed(stock_analysis_tool: QMainWindow,884                           cb: QCheckBox) -> None:885            """886            :param cb:887            :return:888            """889            if cb.isChecked():890                # Add CMF to Display Graph891                stock_analysis_tool.df['cmf'] = chaikin_money_flow(high=stock_analysis_tool.df['high'],892                                                                   low=stock_analysis_tool.df['low'],893                                                                   close=stock_analysis_tool.df['close'],894                                                                   volume=stock_analysis_tool.df['volume'],895                                                                   n=stock_analysis_tool.cmf_n)896                stock_analysis_tool.add_column_to_graph(column_name='cmf')897            else:898                # Remove CMF from Display Graph899                stock_analysis_tool.remove_column_from_graph(column_name='cmf')900                stock_analysis_tool.df = stock_analysis_tool.df.drop("cmf", axis=1)901        @staticmethod902        def adx_cb_pressed(stock_analysis_tool: QMainWindow,903                           cb: QCheckBox) -> None:904            """905            :param cb:906            :return:907            """908            if cb.isChecked():909                # Add ADX to Display Graph910                stock_analysis_tool.df['adx'] = adx(high=stock_analysis_tool.df['high'],911                                                    low=stock_analysis_tool.df['low'],912                                                    close=stock_analysis_tool.df['close'],913                                                    n=stock_analysis_tool.adx_n)914                stock_analysis_tool.add_column_to_graph(column_name='adx')915            else:916                # Remove ADX from Display Graph917                stock_analysis_tool.remove_column_from_graph(column_name='adx')918                stock_analysis_tool.df = stock_analysis_tool.df.drop("adx", axis=1)919        @staticmethod920        def force_index_cb_pressed(cb: QCheckBox) -> None:921            """922            :param cb:923            :return:924            """925            if cb.isChecked():926                # Add RSI to Display Graph927                pass928            else:929                # Remove RSI from Display Graph930                pass931    class StockSources(IntEnum):932        YAHOO_FIN = 0933        ASTRASS = 1934        YFINANCE = 2935    class SampleRates(Enum):936        ONE_DAY = "1 Day"937        ONE_HOUR = "1 Hour"938        THIRTY_MINUTES = "30 Minutes"939    class TimeDeltas(Enum):940        MAX = "MAX"941        FIVE_YEARS = "5 years"942        ONE_YEAR = "1 year"943        YEAR_TO_DATE = "YTD"944        SIX_MONTHS = "6 months"945        ONE_MONTH = "1 month"946        FIVE_DAYS = "5 days"947if __name__ == "__main__":948    import sys949    app = QApplication(sys.argv)950    sat = Ui_StockAnalysisTool(relative_path_correction=".." + sep + ".." + sep)951    sat.show()952    app.setStyleSheet(load_stylesheet(qt_api="pyqt5"))...com.py
Source:com.py  
1import os2from globals import *3# from syscalls import SYSCALLS4# pyright: reportUnnecessaryComparison=false 5initialized: bool = False6stack_limit: int7def intiialize(_stack_limit: int) -> None:8    global initialized, stack_limit9    stack_limit = _stack_limit10    initialized = True11_UNUSED_KEYWORDS: int = 6 # import is dealt with elsewhere, params not used yet12_BUILD_WIN10: str = 'ml /c /coff /Cp %s.asm'13_LINK_WIN10: str = 'link /subsystem:console %s.obj'14_EXECUTE_WIN10: str = '%s.exe'15_DATA_ESCAPE_SEQUENCE: str = '|!DATA!|'16_VARIABLE_TYPE = 'dword'17_BYTE_SIZE: dict[str, int] = {18    'win10,x86': 64,19}20_ESCAPE_TABLE: dict[int, int] = {21    ord('t'): 9,22    ord('n'): 10,23    ord('r'): 13,24}25class BlockType(Enum):26    NONE    = auto()27    IF      = auto()28    ELSEIF  = auto()29    ELSE    = auto()30    WHILE   = auto()31@dataclass32class CodeBody:33    code_body: str = ''34    buffer: list[str] = field(default_factory=list)35    indent: int = 036    def __get_indent(self) -> str:37        assert self.indent >= 0, 'impossible, error in com.py in `__com_program_win10`'38        return ' ' * 4 * self.indent39    def write_no_indent(self, code: str) -> None:40        self.code_body += code41    def write(self, code: str) -> None:42        self.code_body += self.__get_indent() + code43    44    def writel(self, code: str) -> None:45        self.code_body += self.__get_indent() + code + '\n'46    def writecl(self, code: str) -> None:47        self.code_body += '\n' + self.__get_indent() + code + '\n'48    def write_buffer(self, code: str, i: int=0) -> None:49        while len(self.buffer) <= i:50            self.buffer.append('')51        self.buffer[i] += code 52    def dump_buffer(self, i: int=0) -> None:53        self.code_body += self.__get_indent() + self.buffer[i]54        self.buffer[i] = ''55def com_program(program: Program, outfile: str) -> None:56    assert initialized, '`initialize` must be called to use com.py'57    __com_program_win10_x86(program, outfile)58def __com_program_win10_x86(program: Program, outfile: str, compile: bool=True, debug_output:bool=False) -> Union[str, None]:59    assert len(OperationType) == 8, 'Unhandled members of `OperationType`'60    assert len(Keyword) == 11 + _UNUSED_KEYWORDS, 'Unhandled members of `Keyword`'61    assert len(Intrinsic) == 22, 'Unhandled members of `Intrinsic`'62    63    debug_output = True64    # compile = False65    global ifblock_c66    ifblock_c = 067    global wblock_c68    wblock_c = 069    vars: list[Variable] = []70    def escaped_repr(b_str: bytes) -> list[str]:71        chs: list[str] = []72        escaped: bool = False73        for i, b in enumerate(b_str):74            if escaped:75                escaped = False76                continue77            if b != ord('\\'):78                chs.append(str(b))79                continue80            assert i < len(b_str) - 1, 'Unfinished escape sequence'81            esc_ch = b_str[i + 1]82            chs.append(str(_ESCAPE_TABLE[esc_ch]))83            escaped = True84        return chs85    def generate_code_segment(operations: list[Operation], 86                              depth_map: dict[int, int],87                              block_depth: int=0,88                              strs: list[bytes]=[],89                              indent:int=190    ) -> str:91        global ifblock_c92        global wblock_c93        cb = CodeBody()94        cb.indent = indent95        elseif_c: int = 096        cblock: BlockType = BlockType.NONE97        while_cond: list[Operation] = []98        for operation in operations:99            if cblock == BlockType.WHILE:100                while_cond.append(operation)101            else:102                while_cond.clear()103            if operation.type == OperationType.PUSH_INT:104                assert isinstance(operation.operand, int), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'105                cb.writecl(';; --- Push INT [%i] ---;;' % operation.operand)106                cb.writel('mov eax, %i' % operation.operand)107                cb.writel('push eax')108            if operation.type == OperationType.PUSH_BOOL:109                assert isinstance(operation.operand, int) and 0 <= operation.operand <= 1, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'110                cb.writecl(';; --- Push BOOL [%i] ---;;' % operation.operand)111                cb.writel('mov eax, %i' % operation.operand)112                cb.writel('push eax')113            if operation.type == OperationType.PUSH_STR:114                # TODO: fix in statements115                assert isinstance(operation.operand, str), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'116                cb.writecl(';; --- Push STR [%s] ---;;' % operation.operand)117                encoded = operation.operand.encode('utf-8')118                size = len(encoded)119                exists = encoded in strs120                cb.writel('mov eax, %i' % size)121                cb.writel('push eax')122                cb.writel('push offset str_%i' % (len(strs) if not exists else strs.index(encoded)))123                if not exists:124                    strs.append(encoded)125            elif operation.type == OperationType.VAR_REF:126                assert isinstance(operation.operand, str), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'127                name, typ, func_param = operation.operand.split('/')128                cb.writecl(';; --- Push Variable to Stack [%s] ---;;' % name)129                if func_param == 'f':130                    if typ == 'val':131                        cb.writel('mov eax, _%s' % name)132                        cb.writel('mov edx, [eax]')133                    elif typ == 'ref':134                        cb.writel('mov edx, _%s' % name)135                    else:136                        assert False, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'137                    cb.writel('push edx')138                else:139                    cb.writel('mov eax, _%s' % name)140                    cb.writel('push eax')141            elif operation.type == OperationType.PUSH_VAR_REF:142                assert isinstance(operation.operand, str), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'143                name, typ, func_param = operation.operand.split('/')144                cb.writecl(';; --- Push Variable Reference to Stack [%s] ---;;' % name)145                if func_param == 'f':146                    if typ == 'val':147                        cb.writel('mov eax, _%s' % name)148                    elif typ == 'ref':149                        cb.writel('mov eax, offset _%s' % name)150                    else:151                        assert False, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'152                    cb.writel('push eax')153                else:154                    # Cannot get reference of function parameter155                    assert False, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'156            elif operation.type == OperationType.FUNC_CALL:157                assert isinstance(operation.operand, int), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'158                func: Func = program.funcs[operation.operand]159                cb.writecl(';; --- Call Func [%s] ---;;' % func.name)160                cb.writel('call _%s' % func.name)161                if func.rets:162                    cb.writel(';; --- Push Return Value Onto Stack ---;;')163                    cb.writel('push eax')164            elif operation.type == OperationType.WRITE_STACK_SIZE:165                cb.writecl(';; --- Write Stack Size to `stacksize` Variable ---;;')166                cb.writel('mov eax, ebp')167                cb.writel('mov ebx, esp')168                cb.writel('sub eax, ebx')169                cb.writel('mov ebx, offset stacksize')170                cb.writel('mov [ebx], eax')171            elif operation.type == OperationType.PUSH_STACK_SIZE:172                cb.writecl(';; --- Push Stack Size to Stack ---;;')173                cb.writel('mov eax, ebp')174                cb.writel('mov ebx, esp')175                cb.writel('sub eax, ebx')176                cb.writel('push eax')177            elif operation.type == Keyword.LET:178                assert isinstance(operation.operand, str), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'179                value, name = (operation.operand.split('/'))180                assert IS_INT(value), 'Error in tparser.py in `program_from_tokens`'181                value = int(value)182                cb.writecl(';; --- Allocate 1 Bytes of Data for [%s] ---;;' % name)183                cb.writel('invoke crt_malloc, 1')184                cb.writel('mov _%s, eax' % name)185                cb.writel('mov ebx, %i' % value)186                cb.writel('mov [eax], ebx')187                vars.append(Variable(name=name, value=value, type=DataType.INT))188            elif operation.type == Keyword.LETMEM:189                assert isinstance(operation.operand, str), 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'190                value, name = (operation.operand.split('/'))191                assert IS_INT(value), 'Error in tparser.py in `program_from_tokens`'192                value = int(value)193                cb.writecl(';; --- Allocate %i Bytes of Data for [%s]  ---;;' % (value, name))194                cb.writel('invoke crt_malloc, %i' % value)195                cb.writel('mov _%s, eax' % name)196                cb.writel('mov ebx, 0')197                cb.writel('mov [eax], ebx')198                vars.append(Variable(name=name, value=value, type=DataType.INT))199            elif operation.type == Keyword.IF:200                cb.writecl(';; --- Check Condition for _if_%i --- ;;' % ifblock_c)201                cb.write_buffer('\n    ;; --- Jump to _if_%i if True --- ;;'% ifblock_c)202                cb.write_buffer('\n    pop eax\n')203                cb.write_buffer('    mov ebx, 1\n')204                cb.write_buffer('    cmp eax, ebx\n')205                cb.write_buffer('    je _if_%i\n' % ifblock_c)206                cblock = BlockType.IF207                depth_map[block_depth] = ifblock_c208                block_depth += 1209                ifblock_c += 1210            elif operation.type == Keyword.ELSE:211                cb.code_body = cb.code_body.replace(212                    ';; --- Otherwise Jump to _endif_%i --- ;;' % depth_map[block_depth - 1],213                    ';; --- Otherwise Jump to _else_%i --- ;;' % depth_map[block_depth - 1], 1214                )215                cb.buffer[1] = cb.buffer[1].replace('jmp _endif_%i' % depth_map[block_depth - 1], 'jmp _else_%i' % depth_map[block_depth - 1], 1)216                cb.write_buffer('\n_else_%i:\n' % depth_map[block_depth - 1], 1)217                block_body = ''218                if len(operation.args) > 0:219                    assert isinstance(operation.args[0], Operation), 'Error in tparser.py in `program_from_tokens`'220                    block_body = generate_code_segment(operation.args, depth_map, 221                        block_depth=block_depth, 222                        strs=strs223                    )224                cb.write_buffer(block_body, 1)225                cblock = BlockType.ELSE226            elif operation.type == Keyword.ELSEIF:227                cb.writecl(';; --- Check Condition for _elseif_%i_%i --- ;;' % (depth_map[block_depth - 1], elseif_c))228                cb.write_buffer('\n    ;; --- Jump to _elseif_%i_%i if True --- ;;' % (depth_map[block_depth - 1], elseif_c))229                cb.write_buffer('\n    pop eax\n')230                cb.write_buffer('    mov ebx, 1\n')231                cb.write_buffer('    cmp eax, ebx\n')232                cb.write_buffer('    je _elseif_%i_%i\n' % (depth_map[block_depth - 1], elseif_c))233                elseif_c += 1234                cblock = BlockType.ELSEIF235            elif operation.type == Keyword.WHILE:236                cb.writecl(';; --- Check Condition for _while_%i --- ;;' % wblock_c)237                cb.write_buffer('\n    ;; --- Jump to _while_%i if True --- ;;' % wblock_c, 1)238                cb.write_buffer('\n    pop eax\n', 1)239                cb.write_buffer('    mov ebx, 1\n', 1)240                cb.write_buffer('    cmp eax, ebx\n', 1)241                cb.write_buffer('    je _while_%i\n' % wblock_c, 1)242                cb.write_buffer('\n    ;; --- Otherwise Jump to _endw_%i if True --- ;;' % wblock_c, 1)243                cb.write_buffer('\n    jmp _endw_%i\n' % wblock_c, 1)244                cblock = BlockType.WHILE245                depth_map[block_depth] = wblock_c246                block_depth += 1247                wblock_c += 1248            elif operation.type == Keyword.DO:249                cb.write_buffer('\n_while_%i:\n' % depth_map[block_depth - 1], 1)250                block_body = generate_code_segment(operation.args, depth_map, 251                    block_depth=block_depth, 252                    strs=strs253                )254                cb.write_buffer(block_body, 1)255                condition_body = generate_code_segment(while_cond[:-1], depth_map, 256                    block_depth=block_depth, 257                    strs=strs258                )259                cb.write_buffer(condition_body, 1)260                cb.write_buffer('\n    ;; --- Jump to _while_%i if True --- ;;'% depth_map[block_depth - 1], 1)261                cb.write_buffer('\n    pop eax\n', 1)262                cb.write_buffer('    mov ebx, 1\n', 1)263                cb.write_buffer('    cmp eax, ebx\n', 1)264                cb.write_buffer('    je _while_%i\n' % depth_map[block_depth - 1], 1)265            elif operation.type == Keyword.THEN:266                cb.dump_buffer()267                block_body = ''268                if len(operation.args) > 0:269                    assert isinstance(operation.args[0], Operation), 'Error in tparser.py in `program_from_tokens`'270                    block_body = generate_code_segment(operation.args, depth_map, 271                        block_depth=block_depth, 272                        strs=strs273                    )274                if cblock == BlockType.IF:275                    cb.write_buffer('\n    ;; --- Otherwise Jump to _endif_%i --- ;;' % depth_map[block_depth - 1], 1)276                    cb.write_buffer('\n    jmp _endif_%i\n' % depth_map[block_depth - 1], 1)277                    cb.write_buffer('\n_if_%i: ; depth: %i\n' % (depth_map[block_depth - 1] , block_depth), 1)278                    cb.write_buffer(block_body, 1)279                elif cblock == BlockType.ELSEIF:280                    cb.write_buffer('\n_elseif_%i_%i:\n' % (depth_map[block_depth - 1], elseif_c - 1), 1)281                    cb.write_buffer(block_body, 1)282                else:283                    assert False, 'impossible'284                cb.write_buffer('\n    ;; --- Jump Out of the IF-ELSEIF-ELSE Statement --- ;;', 1)285                cb.write_buffer('\n    jmp _endif_%i\n' % depth_map[block_depth - 1], 1)286            elif operation.type == Keyword.END:287                while_cond.clear()288                cb.dump_buffer(1)289                if cblock == BlockType.WHILE:290                    cb.writel('\n_endw_%i:' % depth_map[block_depth - 1])291                elif cblock != BlockType.NONE:292                    cb.writel('\n_endif_%i:' % depth_map[block_depth - 1])293                else:294                    assert False, 'impossible'295                elseif_c = 0296                block_depth -= 1297            elif operation.type == Keyword.COUNTER:298                assert isinstance(operation.operand, int), 'Error in tparser.py in `program_from_tokens`'299                cb.writecl(';; --- Push INT from Internal Counter [%i] ---;;' % operation.operand)300                cb.writel('mov eax, %i' % operation.operand)301                cb.writel('push eax')302            elif operation.type == Keyword.RESET:303                assert isinstance(operation.operand, int), 'Error in tparser.py in `program_from_tokens`'304                cb.writecl(';; --- Push INT from Internal Counter, Also Resets [%i] ---;;' % operation.operand)305                cb.writel('mov eax, %i' % operation.operand)306                cb.writel('push eax')307            elif operation.type == Keyword.PARAMSPLIT:308                assert False, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'309            elif operation.type == Keyword.RETURN:310                assert isinstance(operation.operand, int) and 0 <= operation.operand <= 1, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'311                cb.writecl(';; --- Return Move Top Value of Stack into `eax` to Return ---;;')312                cb.writel('pop eax')313                cb.writel('ret')314            elif operation.type == Keyword.RETURNNONE:315                assert isinstance(operation.operand, int) and 0 <= operation.operand <= 1, 'Error in tparser.py in `program_from_tokens` or tokenizer.py in `tokenize_src`'316                cb.writecl(';; --- Set `eax` to 0 and Return ---;;')317                cb.writel('mov eax, 0')318                cb.writel('ret')319            elif operation.type == Intrinsic.PLUS:320                cb.writecl(';; --- PLUS --- ;;')321                cb.writel('pop eax')322                cb.writel('pop ebx')323                cb.writel('add eax, ebx')324                cb.writel('push eax')325            elif operation.type == Intrinsic.MINUS:326                cb.writecl(';; --- PLUS --- ;;')327                cb.writel('pop eax')328                cb.writel('pop ebx')329                cb.writel('sub eax, ebx')330                cb.writel('push eax')331            elif operation.type == Intrinsic.MULTIPLY:332                cb.writecl(';; --- MULTIPLY --- ;;')333                cb.writel('pop eax')334                cb.writel('pop ebx')335                cb.writel('mul ebx')336                cb.writel('push eax')337            elif operation.type == Intrinsic.DIVMOD:338                cb.writecl(';; --- DIVMOD --- ;;')339                cb.writel('mov edx, 0')340                cb.writel('pop eax')341                cb.writel('pop ecx')342                cb.writel('div ecx')343                cb.writel('push eax')344                cb.writel('push edx')345            elif operation.type == Intrinsic.PRINT:346                cb.writecl(';; --- PRINT --- ;;')347                cb.writel('pop eax')348                cb.writel('printf("%i\\n", eax)')349            elif operation.type == Intrinsic.SWAP:350                cb.writecl(';; --- SWAP --- ;;')351                cb.writel('pop eax')352                cb.writel('pop ebx')353                cb.writel('push eax')354                cb.writel('push ebx')355            elif operation.type == Intrinsic.EQUALS:356                cb.writecl(';; --- EQUALS --- ;;')357                cb.writel('mov ecx, 0')358                cb.writel('mov edx, 1')359                cb.writel('pop eax')360                cb.writel('pop ebx')361                cb.writel('cmp eax, ebx')362                cb.writel('cmove ecx, edx')363                cb.writel('push ecx')364            elif operation.type == Intrinsic.LESS:365                cb.writecl(';; --- LESS --- ;;')366                cb.writel('mov ecx, 0')367                cb.writel('mov edx, 1')368                cb.writel('pop eax')369                cb.writel('pop ebx')370                cb.writel('cmp ebx, eax')371                cb.writel('cmovl ecx, edx')372                cb.writel('push ecx')373            elif operation.type == Intrinsic.GREATER:374                cb.writecl(';; --- GREATER --- ;;')375                cb.writel('mov ecx, 0')376                cb.writel('mov edx, 1')377                cb.writel('pop eax')378                cb.writel('pop ebx')379                cb.writel('cmp ebx, eax')380                cb.writel('cmovg ecx, edx')381                cb.writel('push ecx')382            elif operation.type == Intrinsic.DUP:383                cb.writecl(';; --- DUP --- ;;')384                cb.writel('pop eax')385                cb.writel('push eax')386                cb.writel('push eax')387            elif operation.type == Intrinsic.DROP:388                cb.writecl(';; --- DROP --- ;;')389                cb.writel('pop eax')390            elif operation.type == Intrinsic.STORE: 391                cb.writecl(';; --- STORE --- ;;')392                cb.writel('pop eax') 393                cb.writel('pop ebx') 394                cb.writel('mov [ebx], eax')395            elif operation.type == Intrinsic.READ:396                cb.writecl(';; --- READ --- ;;')397                cb.writel('pop eax')398                cb.writel('mov ebx, [eax]')399                cb.writel('push ebx')400            elif operation.type == Intrinsic.INC:401                cb.writecl(';; --- INCREMENT --- ;;')402                cb.writel('pop eax')403                cb.writel('inc eax')404                cb.writel('push eax')405            elif operation.type == Intrinsic.DEC:406                cb.writecl(';; --- DECREMENT --- ;;')407                cb.writel('pop eax')408                cb.writel('dec eax')409                cb.writel('push eax')410            elif operation.type == Intrinsic.STDOUT:411                cb.writecl(';; --- Prints From Top of Stack to StdOut --- ;;')412                cb.writel('pop eax')413                cb.writel('invoke StdOut, eax')414                cb.writel('pop eax')415            elif operation.type == Intrinsic.STDERR:416                assert False, 'STDERR'417            elif operation.type == Intrinsic.BYTE:418                size = _BYTE_SIZE['win10,x86']419                cb.writecl(';; --- Push Byte Size [%i] ---;;' % size)420                cb.writel('mov eax, %i' % size)421                cb.writel('push eax')422            elif operation.type == Intrinsic.AND:423                cb.writecl(';; --- AND -- ;;')424                cb.writel('pop eax')425                cb.writel('pop ebx')426                cb.writel('and eax, ebx')427                cb.writel('push eax')428            elif operation.type == Intrinsic.OR:429                cb.writecl(';; --- OR -- ;;')430                cb.writel('pop eax')431                cb.writel('pop ebx')432                cb.writel('or eax, ebx')433                cb.writel('push eax')434            elif operation.type == Intrinsic.XOR:435                cb.writecl(';; --- XOR -- ;;')436                cb.writel('pop eax')437                cb.writel('pop ebx')438                cb.writel('xor eax, ebx')439                cb.writel('push eax')440            elif operation.type == Intrinsic.NOT:441                cb.writecl(';; --- NOT -- ;;')442                cb.writel('pop eax')443                cb.writel('not eax')444                cb.writel('push eax')445            446        return cb.code_body447    448    cb: CodeBody = CodeBody()449    strs: list[bytes] = []450    depth_map: dict[int, int] = {}451    452    cb.writel(';; Necessary initialization statements ;;')453    cb.writel('.686')454    cb.writel('.model flat, stdcall')455    cb.writel('assume fs:nothing\n')456    cb.writel(';; Necessary include statments ;;')457    cb.writel('include /masm32/macros/macros.asm')458    cb.writel('include /masm32/include/kernel32.inc')459    cb.writel('include /masm32/include/msvcrt.inc')460    cb.writel('include /masm32/include/masm32.inc')461    cb.writel('includelib /masm32/lib/kernel32')462    cb.writel('includelib /masm32/lib/msvcrt')463    cb.writel('includelib /masm32/lib/masm32')464    cb.writel(_DATA_ESCAPE_SEQUENCE)465    cb.writel('\n;; --- Code Body ---;;')466    cb.writel('.code')467     468    for func in program.funcs:469        cb.write('\n')470        cb.writel('_%s proc %s' % (func.name, ', '.join(f'_{param}: dword' for param in func.params)))471        func_body = generate_code_segment(func.operations, depth_map, 472            block_depth=1, 473            strs=strs474        )475        cb.writel(func_body)476        cb.writel('_%s endp' % func.name)477    478    479    cb.writel('\nstart:')480    main_body = generate_code_segment(program.operations, depth_map, 481        block_depth=0, 482        strs=strs483    )484    cb.writel(main_body)485    data_str: str = ''486    data_str += '\n;; --- Data Declarations --- ;;'487    data_str += '\n.data'488    data_str += '\n\n;; --- Default Program Data --- ;;'489    data_str += '\nstacksize dword 0'490    data_str += '\n\n;; --- String Literal Data --- ;;'491    for i, Str in enumerate(strs):492        data_str += '\nstr_%i db %s' % (i, ', '.join(escaped_repr(Str)))493    data_str += '\n\n;; --- Uninitialized Data Declarations --- ;;'494    data_str += '\n.data?'495    data_str += '\n\n;; --- Variable Data --- ;;'496    for i, Var in enumerate(vars):497        data_str += '\n_%s %s ?' % (Var.name, _VARIABLE_TYPE)498    cb.code_body = cb.code_body.replace(_DATA_ESCAPE_SEQUENCE, data_str)499    cb.indent = 0500    cb.writel(';; Ends the program ;;')501    cb.writel('invoke ExitProcess, 0')502    cb.writel('end start')503    cb.writel('end')504    if debug_output:505        with open(f'{outfile}_ops.txt', 'w') as f:506            for op in program.operations:507                f.write('%s\n' % op.to_str())508        with open(f'{outfile}_vars.txt', 'w') as f:509            for var in vars:510                f.write('%s\n' % var.__str__())511        with open(f'{outfile}_funcs.txt', 'w') as f:512            for func in program.funcs:513                f.write('%s\n' % func.__str__())514    if compile:515        with open(os.path.join(os.getcwd(), f'{outfile}.asm'), 'w') as out:516            out.write(cb.code_body)517    else:518        pass519        # return cb.code_body520    filepath = os.path.join(os.getcwd(), outfile)521    os.system(_BUILD_WIN10 % filepath)522    filepath = os.path.join(os.getcwd(), outfile.split('/')[-1])523    os.system(_LINK_WIN10 % filepath)...test_key.py
Source:test_key.py  
1# -*- coding: utf-8 -*-2# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/3# All rights reserved.4#5# Permission is hereby granted, free of charge, to any person obtaining a6# copy of this software and associated documentation files (the7# "Software"), to deal in the Software without restriction, including8# without limitation the rights to use, copy, modify, merge, publish, dis-9# tribute, sublicense, and/or sell copies of the Software, and to permit10# persons to whom the Software is furnished to do so, subject to the fol-11# lowing conditions:12#13# The above copyright notice and this permission notice shall be included14# in all copies or substantial portions of the Software.15#16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS17# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-18# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT19# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,20# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,21# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS22# IN THE SOFTWARE.23"""24Some unit tests for S3 Key25"""26from tests.unit import unittest27import time28import random29import boto.s330from boto.compat import six, StringIO, urllib31from boto.s3.connection import S3Connection32from boto.s3.key import Key33from boto.exception import S3ResponseError34class S3KeyTest(unittest.TestCase):35    s3 = True36    def setUp(self):37        self.conn = S3Connection()38        random.seed()39        self.bucket_name = 'keytest-%d-%d' % (40            time.time(), random.randint(1, 99999999))41        self.bucket = self.conn.create_bucket(self.bucket_name)42    def tearDown(self):43        for key in self.bucket:44            key.delete()45        self.bucket.delete()46    def test_set_contents_from_file_dataloss(self):47        # Create an empty stringio and write to it.48        content = "abcde"49        sfp = StringIO()50        sfp.write(content)51        # Try set_contents_from_file() without rewinding sfp52        k = self.bucket.new_key("k")53        try:54            k.set_contents_from_file(sfp)55            self.fail("forgot to rewind so should fail.")56        except AttributeError:57            pass58        # call with rewind and check if we wrote 5 bytes59        k.set_contents_from_file(sfp, rewind=True)60        self.assertEqual(k.size, 5)61        # check actual contents by getting it.62        kn = self.bucket.new_key("k")63        ks = kn.get_contents_as_string().decode('utf-8')64        self.assertEqual(ks, content)65        # finally, try with a 0 length string66        sfp = StringIO()67        k = self.bucket.new_key("k")68        k.set_contents_from_file(sfp)69        self.assertEqual(k.size, 0)70        # check actual contents by getting it.71        kn = self.bucket.new_key("k")72        ks = kn.get_contents_as_string().decode('utf-8')73        self.assertEqual(ks, "")74    def test_set_contents_as_file(self):75        content="01234567890123456789"76        sfp = StringIO(content)77        # fp is set at 0 for just opened (for read) files.78        # set_contents should write full content to key.79        k = self.bucket.new_key("k")80        k.set_contents_from_file(sfp)81        self.assertEqual(k.size, 20)82        kn = self.bucket.new_key("k")83        ks = kn.get_contents_as_string().decode('utf-8')84        self.assertEqual(ks, content)85        # set fp to 5 and set contents. this should86        # set "567890123456789" to the key87        sfp.seek(5)88        k = self.bucket.new_key("k")89        k.set_contents_from_file(sfp)90        self.assertEqual(k.size, 15)91        kn = self.bucket.new_key("k")92        ks = kn.get_contents_as_string().decode('utf-8')93        self.assertEqual(ks, content[5:])94        # set fp to 5 and only set 5 bytes. this should95        # write the value "56789" to the key.96        sfp.seek(5)97        k = self.bucket.new_key("k")98        k.set_contents_from_file(sfp, size=5)99        self.assertEqual(k.size, 5)100        self.assertEqual(sfp.tell(), 10)101        kn = self.bucket.new_key("k")102        ks = kn.get_contents_as_string().decode('utf-8')103        self.assertEqual(ks, content[5:10])104    def test_set_contents_with_md5(self):105        content="01234567890123456789"106        sfp = StringIO(content)107        # fp is set at 0 for just opened (for read) files.108        # set_contents should write full content to key.109        k = self.bucket.new_key("k")110        good_md5 = k.compute_md5(sfp)111        k.set_contents_from_file(sfp, md5=good_md5)112        kn = self.bucket.new_key("k")113        ks = kn.get_contents_as_string().decode('utf-8')114        self.assertEqual(ks, content)115        # set fp to 5 and only set 5 bytes. this should116        # write the value "56789" to the key.117        sfp.seek(5)118        k = self.bucket.new_key("k")119        good_md5 = k.compute_md5(sfp, size=5)120        k.set_contents_from_file(sfp, size=5, md5=good_md5)121        self.assertEqual(sfp.tell(), 10)122        kn = self.bucket.new_key("k")123        ks = kn.get_contents_as_string().decode('utf-8')124        self.assertEqual(ks, content[5:10])125        # let's try a wrong md5 by just altering it.126        k = self.bucket.new_key("k")127        sfp.seek(0)128        hexdig, base64 = k.compute_md5(sfp)129        bad_md5 = (hexdig, base64[3:])130        try:131            k.set_contents_from_file(sfp, md5=bad_md5)132            self.fail("should fail with bad md5")133        except S3ResponseError:134            pass135    def test_get_contents_with_md5(self):136        content="01234567890123456789"137        sfp = StringIO(content)138        k = self.bucket.new_key("k")139        k.set_contents_from_file(sfp)140        kn = self.bucket.new_key("k")141        s = kn.get_contents_as_string().decode('utf-8')142        self.assertEqual(kn.md5, k.md5)143        self.assertEqual(s, content)144    def test_file_callback(self):145        def callback(wrote, total):146            self.my_cb_cnt += 1147            self.assertNotEqual(wrote, self.my_cb_last, "called twice with same value")148            self.my_cb_last = wrote149        # Zero bytes written => 1 call150        self.my_cb_cnt = 0151        self.my_cb_last = None152        k = self.bucket.new_key("k")153        k.BufferSize = 2154        sfp = StringIO("")155        k.set_contents_from_file(sfp, cb=callback, num_cb=10)156        self.assertEqual(self.my_cb_cnt, 1)157        self.assertEqual(self.my_cb_last, 0)158        sfp.close()159        # Read back zero bytes => 1 call160        self.my_cb_cnt = 0161        self.my_cb_last = None162        s = k.get_contents_as_string(cb=callback)163        self.assertEqual(self.my_cb_cnt, 1)164        self.assertEqual(self.my_cb_last, 0)165        content="01234567890123456789"166        sfp = StringIO(content)167        # expect 2 calls due start/finish168        self.my_cb_cnt = 0169        self.my_cb_last = None170        k = self.bucket.new_key("k")171        k.set_contents_from_file(sfp, cb=callback, num_cb=10)172        self.assertEqual(self.my_cb_cnt, 2)173        self.assertEqual(self.my_cb_last, 20)174        # Read back all bytes => 2 calls175        self.my_cb_cnt = 0176        self.my_cb_last = None177        s = k.get_contents_as_string(cb=callback).decode('utf-8')178        self.assertEqual(self.my_cb_cnt, 2)179        self.assertEqual(self.my_cb_last, 20)180        self.assertEqual(s, content)181        # rewind sfp and try upload again. -1 should call182        # for every read/write so that should make 11 when bs=2183        sfp.seek(0)184        self.my_cb_cnt = 0185        self.my_cb_last = None186        k = self.bucket.new_key("k")187        k.BufferSize = 2188        k.set_contents_from_file(sfp, cb=callback, num_cb=-1)189        self.assertEqual(self.my_cb_cnt, 11)190        self.assertEqual(self.my_cb_last, 20)191        # Read back all bytes => 11 calls192        self.my_cb_cnt = 0193        self.my_cb_last = None194        s = k.get_contents_as_string(cb=callback, num_cb=-1).decode('utf-8')195        self.assertEqual(self.my_cb_cnt, 11)196        self.assertEqual(self.my_cb_last, 20)197        self.assertEqual(s, content)198        # no more than 1 times => 2 times199        # last time always 20 bytes200        sfp.seek(0)201        self.my_cb_cnt = 0202        self.my_cb_last = None203        k = self.bucket.new_key("k")204        k.BufferSize = 2205        k.set_contents_from_file(sfp, cb=callback, num_cb=1)206        self.assertTrue(self.my_cb_cnt <= 2)207        self.assertEqual(self.my_cb_last, 20)208        # no more than 1 times => 2 times209        self.my_cb_cnt = 0210        self.my_cb_last = None211        s = k.get_contents_as_string(cb=callback, num_cb=1).decode('utf-8')212        self.assertTrue(self.my_cb_cnt <= 2)213        self.assertEqual(self.my_cb_last, 20)214        self.assertEqual(s, content)215        # no more than 2 times216        # last time always 20 bytes217        sfp.seek(0)218        self.my_cb_cnt = 0219        self.my_cb_last = None220        k = self.bucket.new_key("k")221        k.BufferSize = 2222        k.set_contents_from_file(sfp, cb=callback, num_cb=2)223        self.assertTrue(self.my_cb_cnt <= 2)224        self.assertEqual(self.my_cb_last, 20)225        # no more than 2 times226        self.my_cb_cnt = 0227        self.my_cb_last = None228        s = k.get_contents_as_string(cb=callback, num_cb=2).decode('utf-8')229        self.assertTrue(self.my_cb_cnt <= 2)230        self.assertEqual(self.my_cb_last, 20)231        self.assertEqual(s, content)232        # no more than 3 times233        # last time always 20 bytes234        sfp.seek(0)235        self.my_cb_cnt = 0236        self.my_cb_last = None237        k = self.bucket.new_key("k")238        k.BufferSize = 2239        k.set_contents_from_file(sfp, cb=callback, num_cb=3)240        self.assertTrue(self.my_cb_cnt <= 3)241        self.assertEqual(self.my_cb_last, 20)242        # no more than 3 times243        self.my_cb_cnt = 0244        self.my_cb_last = None245        s = k.get_contents_as_string(cb=callback, num_cb=3).decode('utf-8')246        self.assertTrue(self.my_cb_cnt <= 3)247        self.assertEqual(self.my_cb_last, 20)248        self.assertEqual(s, content)249        # no more than 4 times250        # last time always 20 bytes251        sfp.seek(0)252        self.my_cb_cnt = 0253        self.my_cb_last = None254        k = self.bucket.new_key("k")255        k.BufferSize = 2256        k.set_contents_from_file(sfp, cb=callback, num_cb=4)257        self.assertTrue(self.my_cb_cnt <= 4)258        self.assertEqual(self.my_cb_last, 20)259        # no more than 4 times260        self.my_cb_cnt = 0261        self.my_cb_last = None262        s = k.get_contents_as_string(cb=callback, num_cb=4).decode('utf-8')263        self.assertTrue(self.my_cb_cnt <= 4)264        self.assertEqual(self.my_cb_last, 20)265        self.assertEqual(s, content)266        # no more than 6 times267        # last time always 20 bytes268        sfp.seek(0)269        self.my_cb_cnt = 0270        self.my_cb_last = None271        k = self.bucket.new_key("k")272        k.BufferSize = 2273        k.set_contents_from_file(sfp, cb=callback, num_cb=6)274        self.assertTrue(self.my_cb_cnt <= 6)275        self.assertEqual(self.my_cb_last, 20)276        # no more than 6 times277        self.my_cb_cnt = 0278        self.my_cb_last = None279        s = k.get_contents_as_string(cb=callback, num_cb=6).decode('utf-8')280        self.assertTrue(self.my_cb_cnt <= 6)281        self.assertEqual(self.my_cb_last, 20)282        self.assertEqual(s, content)283        # no more than 10 times284        # last time always 20 bytes285        sfp.seek(0)286        self.my_cb_cnt = 0287        self.my_cb_last = None288        k = self.bucket.new_key("k")289        k.BufferSize = 2290        k.set_contents_from_file(sfp, cb=callback, num_cb=10)291        self.assertTrue(self.my_cb_cnt <= 10)292        self.assertEqual(self.my_cb_last, 20)293        # no more than 10 times294        self.my_cb_cnt = 0295        self.my_cb_last = None296        s = k.get_contents_as_string(cb=callback, num_cb=10).decode('utf-8')297        self.assertTrue(self.my_cb_cnt <= 10)298        self.assertEqual(self.my_cb_last, 20)299        self.assertEqual(s, content)300        # no more than 1000 times301        # last time always 20 bytes302        sfp.seek(0)303        self.my_cb_cnt = 0304        self.my_cb_last = None305        k = self.bucket.new_key("k")306        k.BufferSize = 2307        k.set_contents_from_file(sfp, cb=callback, num_cb=1000)308        self.assertTrue(self.my_cb_cnt <= 1000)309        self.assertEqual(self.my_cb_last, 20)310        # no more than 1000 times311        self.my_cb_cnt = 0312        self.my_cb_last = None313        s = k.get_contents_as_string(cb=callback, num_cb=1000).decode('utf-8')314        self.assertTrue(self.my_cb_cnt <= 1000)315        self.assertEqual(self.my_cb_last, 20)316        self.assertEqual(s, content)317    def test_website_redirects(self):318        self.bucket.configure_website('index.html')319        key = self.bucket.new_key('redirect-key')320        self.assertTrue(key.set_redirect('http://www.amazon.com/'))321        self.assertEqual(key.get_redirect(), 'http://www.amazon.com/')322        self.assertTrue(key.set_redirect('http://aws.amazon.com/'))323        self.assertEqual(key.get_redirect(), 'http://aws.amazon.com/')324    def test_website_redirect_none_configured(self):325        key = self.bucket.new_key('redirect-key')326        key.set_contents_from_string('')327        self.assertEqual(key.get_redirect(), None)328    def test_website_redirect_with_bad_value(self):329        self.bucket.configure_website('index.html')330        key = self.bucket.new_key('redirect-key')331        with self.assertRaises(key.provider.storage_response_error):332            # Must start with a / or http333            key.set_redirect('ftp://ftp.example.org')334        with self.assertRaises(key.provider.storage_response_error):335            # Must start with a / or http336            key.set_redirect('')337    def test_setting_date(self):338        key = self.bucket.new_key('test_date')339        # This should actually set x-amz-meta-date & not fail miserably.340        key.set_metadata('date', '20130524T155935Z')341        key.set_contents_from_string('Some text here.')342        check = self.bucket.get_key('test_date')343        self.assertEqual(check.get_metadata('date'), u'20130524T155935Z')344        self.assertTrue('x-amz-meta-date' in check._get_remote_metadata())345    def test_header_casing(self):346        key = self.bucket.new_key('test_header_case')347        # Using anything but CamelCase on ``Content-Type`` or ``Content-MD5``348        # used to cause a signature error (when using ``s3`` for signing).349        key.set_metadata('Content-type', 'application/json')350        key.set_metadata('Content-md5', 'XmUKnus7svY1frWsVskxXg==')351        key.set_contents_from_string('{"abc": 123}')352        check = self.bucket.get_key('test_header_case')353        self.assertEqual(check.content_type, 'application/json')354    def test_header_encoding(self):355        key = self.bucket.new_key('test_header_encoding')356        key.set_metadata('Cache-control', u'public, max-age=500')357        key.set_metadata('Test-Plus', u'A plus (+)')358        key.set_metadata('Content-disposition', u'filename=Schöne Zeit.txt')359        key.set_metadata('Content-Encoding', 'gzip')360        key.set_metadata('Content-Language', 'de')361        key.set_metadata('Content-Type', 'application/pdf')362        self.assertEqual(key.content_type, 'application/pdf')363        key.set_metadata('X-Robots-Tag', 'all')364        key.set_metadata('Expires', u'Thu, 01 Dec 1994 16:00:00 GMT')365        key.set_contents_from_string('foo')366        check = self.bucket.get_key('test_header_encoding')367        remote_metadata = check._get_remote_metadata()368        # TODO: investigate whether encoding ' ' as '%20' makes sense369        self.assertIn(370            check.cache_control,371            ('public,%20max-age=500', 'public, max-age=500')372        )373        self.assertIn(remote_metadata['cache-control'],374                      ('public,%20max-age=500', 'public, max-age=500'))375        self.assertEqual(check.get_metadata('test-plus'), 'A plus (+)')376        self.assertEqual(check.content_disposition, 'filename=Sch%C3%B6ne%20Zeit.txt')377        self.assertEqual(remote_metadata['content-disposition'], 'filename=Sch%C3%B6ne%20Zeit.txt')378        self.assertEqual(check.content_encoding, 'gzip')379        self.assertEqual(remote_metadata['content-encoding'], 'gzip')380        self.assertEqual(check.content_language, 'de')381        self.assertEqual(remote_metadata['content-language'], 'de')382        self.assertEqual(check.content_type, 'application/pdf')383        self.assertEqual(remote_metadata['content-type'], 'application/pdf')384        self.assertEqual(check.x_robots_tag, 'all')385        self.assertEqual(remote_metadata['x-robots-tag'], 'all')386        self.assertEqual(check.expires, 'Thu,%2001%20Dec%201994%2016:00:00%20GMT')387        self.assertEqual(remote_metadata['expires'], 'Thu,%2001%20Dec%201994%2016:00:00%20GMT')388        expected = u'filename=Schöne Zeit.txt'389        if six.PY2:390            # Newer versions of python default to unicode strings, but python 2391            # requires encoding to UTF-8 to compare the two properly392            expected = expected.encode('utf-8')393        self.assertEqual(394            urllib.parse.unquote(check.content_disposition),395            expected396        )397    def test_set_contents_with_sse_c(self):398        content="01234567890123456789"399        # the plain text of customer key is "01testKeyToSSEC!"400        header = {401            "x-amz-server-side-encryption-customer-algorithm" :402             "AES256",403            "x-amz-server-side-encryption-customer-key" :404             "MAAxAHQAZQBzAHQASwBlAHkAVABvAFMAUwBFAEMAIQA=",405            "x-amz-server-side-encryption-customer-key-MD5" :406             "fUgCZDDh6bfEMuP2bN38mg=="407        }408        # upload and download content with AWS specified headers409        k = self.bucket.new_key("testkey_for_sse_c")410        k.set_contents_from_string(content, headers=header)411        kn = self.bucket.new_key("testkey_for_sse_c")412        ks = kn.get_contents_as_string(headers=header)413        self.assertEqual(ks, content.encode('utf-8'))414class S3KeySigV4Test(unittest.TestCase):415    def setUp(self):416        self.conn = boto.s3.connect_to_region('eu-central-1')417        self.bucket_name = 'boto-sigv4-key-%d' % int(time.time())418        self.bucket = self.conn.create_bucket(self.bucket_name,419                                              location='eu-central-1')420    def tearDown(self):421        for key in self.bucket:422            key.delete()423        self.bucket.delete()424    def test_put_get_with_non_string_headers_key(self):425        k = Key(self.bucket)426        k.key = 'foobar'427        body = 'This is a test of S3'428        # A content-length header will be added to this request since it429        # has a body.430        k.set_contents_from_string(body)431        # Set a header that has an integer. This checks for a bug where432        # the sigv4 signer assumes that all of the headers are strings.433        headers = {'Content-Length': 0}434        from_s3_key = self.bucket.get_key('foobar', headers=headers)435        self.assertEqual(from_s3_key.get_contents_as_string().decode('utf-8'),436                         body)437    def test_head_put_get_with_non_ascii_key(self):438        k = Key(self.bucket)439        k.key = u'''pt-Olá_ch-你好_ko-ìë
_ru-ÐдÑавÑÑвÑйÑе%20,.<>~`!@#$%^&()_-+='"'''440        body = 'This is a test of S3'441        k.set_contents_from_string(body)442        from_s3_key = self.bucket.get_key(k.key, validate=True)443        self.assertEqual(from_s3_key.get_contents_as_string().decode('utf-8'),444                         body)445        keys = self.bucket.get_all_keys(prefix=k.key, max_keys=1)446        self.assertEqual(1, len(keys))447class S3KeyVersionCopyTest(unittest.TestCase):448    def setUp(self):449        self.conn = S3Connection()450        self.bucket_name = 'boto-key-version-copy-%d' % int(time.time())451        self.bucket = self.conn.create_bucket(self.bucket_name)452        self.bucket.configure_versioning(True)453    def tearDown(self):454        for key in self.bucket.list_versions():455            key.delete()456        self.bucket.delete()457    def test_key_overwrite_and_copy(self):458        first_content = b"abcdefghijklm"459        second_content = b"nopqrstuvwxyz"460        k = Key(self.bucket, 'testkey')461        k.set_contents_from_string(first_content)462        # Wait for S3's eventual consistency (may not be necessary)463        while self.bucket.get_key('testkey') is None:464            time.sleep(5)465        # Get the first version_id466        first_key = self.bucket.get_key('testkey')467        first_version_id = first_key.version_id468        # Overwrite the key469        k = Key(self.bucket, 'testkey')470        k.set_contents_from_string(second_content)471        # Wait for eventual consistency472        while True:473            second_key = self.bucket.get_key('testkey')474            if second_key is None or second_key.version_id == first_version_id:475                time.sleep(5)476            else:477                break478        # Copy first key (no longer the current version) to a new key479        source_key = self.bucket.get_key('testkey',480                                          version_id=first_version_id)481        source_key.copy(self.bucket, 'copiedkey')482        while self.bucket.get_key('copiedkey') is None:483            time.sleep(5)484        copied_key = self.bucket.get_key('copiedkey')485        copied_key_contents = copied_key.get_contents_as_string()...libvirt-override-virConnect.py
Source:libvirt-override-virConnect.py  
...33        """Dispatches events to python user domain event callbacks34        """35        try:36            for cb,opaque in self.domainEventCallbacks.items():37                cb(self, virDomain(self, _obj=dom), event, detail, opaque)38            return 039        except AttributeError:40            pass41    def _dispatchDomainEventLifecycleCallback(self, dom, event, detail, cbData):42        """Dispatches events to python user domain lifecycle event callbacks43        """44        cb = cbData["cb"]45        opaque = cbData["opaque"]46        cb(self, virDomain(self, _obj=dom), event, detail, opaque)47        return 048    def _dispatchDomainEventGenericCallback(self, dom, cbData):49        """Dispatches events to python user domain generic event callbacks50        """51        cb = cbData["cb"]52        opaque = cbData["opaque"]53        cb(self, virDomain(self, _obj=dom), opaque)54        return 055    def _dispatchDomainEventRTCChangeCallback(self, dom, offset, cbData):56        """Dispatches events to python user domain RTC change event callbacks57        """58        cb = cbData["cb"]59        opaque = cbData["opaque"]60        cb(self, virDomain(self, _obj=dom), offset ,opaque)61        return 062    def _dispatchDomainEventWatchdogCallback(self, dom, action, cbData):63        """Dispatches events to python user domain watchdog event callbacks64        """65        cb = cbData["cb"]66        opaque = cbData["opaque"]67        cb(self, virDomain(self, _obj=dom), action, opaque)68        return 069    def _dispatchDomainEventIOErrorCallback(self, dom, srcPath, devAlias,70                                            action, cbData):71        """Dispatches events to python user domain IO error event callbacks72        """73        cb = cbData["cb"]74        opaque = cbData["opaque"]75        cb(self, virDomain(self, _obj=dom), srcPath, devAlias, action, opaque)76        return 077    def _dispatchDomainEventIOErrorReasonCallback(self, dom, srcPath,78                                                  devAlias, action, reason,79                                                  cbData):80        """Dispatches events to python user domain IO error event callbacks81        """82        cb = cbData["cb"]83        opaque = cbData["opaque"]84        cb(self, virDomain(self, _obj=dom), srcPath, devAlias, action,85           reason, opaque)86        return 087    def _dispatchDomainEventGraphicsCallback(self, dom, phase, localAddr,88                                            remoteAddr, authScheme, subject,89                                            cbData):90        """Dispatches events to python user domain graphics event callbacks91        """92        cb = cbData["cb"]93        opaque = cbData["opaque"]94        cb(self, virDomain(self, _obj=dom), phase, localAddr, remoteAddr,95           authScheme, subject, opaque)96        return 097    def _dispatchDomainEventBlockJobCallback(self, dom, disk, type, status, cbData):98        """Dispatches events to python user domain blockJob/blockJob2 event callbacks99        """100        try:101            cb = cbData["cb"]102            opaque = cbData["opaque"]103            cb(self, virDomain(self, _obj=dom), disk, type, status, opaque)104            return 0105        except AttributeError:106            pass107    def _dispatchDomainEventDiskChangeCallback(self, dom, oldSrcPath, newSrcPath, devAlias, reason, cbData):108        """Dispatches event to python user domain diskChange event callbacks109        """110        cb = cbData["cb"]111        opaque = cbData["opaque"]112        cb(self, virDomain(self, _obj=dom), oldSrcPath, newSrcPath, devAlias, reason, opaque)113        return 0114    def _dispatchDomainEventTrayChangeCallback(self, dom, devAlias, reason, cbData):115        """Dispatches event to python user domain trayChange event callbacks116        """117        cb = cbData["cb"]118        opaque = cbData["opaque"]119        cb(self, virDomain(self, _obj=dom), devAlias, reason, opaque)120        return 0121    def _dispatchDomainEventPMWakeupCallback(self, dom, reason, cbData):122        """Dispatches event to python user domain pmwakeup event callbacks123        """124        cb = cbData["cb"]125        opaque = cbData["opaque"]126        cb(self, virDomain(self, _obj=dom), reason, opaque)127        return 0128    def _dispatchDomainEventPMSuspendCallback(self, dom, reason, cbData):129        """Dispatches event to python user domain pmsuspend event callbacks130        """131        cb = cbData["cb"]132        opaque = cbData["opaque"]133        cb(self, virDomain(self, _obj=dom), reason, opaque)134        return 0135    def _dispatchDomainEventBalloonChangeCallback(self, dom, actual, cbData):136        """Dispatches events to python user domain balloon change event callbacks137        """138        cb = cbData["cb"]139        opaque = cbData["opaque"]140        cb(self, virDomain(self, _obj=dom), actual, opaque)141        return 0142    def _dispatchDomainEventPMSuspendDiskCallback(self, dom, reason, cbData):143        """Dispatches event to python user domain pmsuspend-disk event callbacks144        """145        cb = cbData["cb"]146        opaque = cbData["opaque"]147        cb(self, virDomain(self, _obj=dom), reason, opaque)148        return 0149    def _dispatchDomainEventDeviceRemovedCallback(self, dom, devAlias, cbData):150        """Dispatches event to python user domain device removed event callbacks151        """152        cb = cbData["cb"]153        opaque = cbData["opaque"]154        cb(self, virDomain(self, _obj=dom), devAlias, opaque)155        return 0156    def _dispatchDomainEventTunableCallback(self, dom, params, cbData):157        """Dispatches event to python user domain tunable event callbacks158        """159        cb = cbData["cb"]160        opaque = cbData["opaque"]161        cb(self, virDomain(self, _obj=dom), params, opaque)162        return 0163    def _dispatchDomainEventAgentLifecycleCallback(self, dom, state, reason, cbData):164        """Dispatches event to python user domain agent lifecycle event callback165        """166        cb = cbData["cb"]167        opaque = cbData["opaque"]168        cb(self, virDomain(self, _obj=dom), state, reason, opaque)169        return 0170    def _dispatchDomainEventDeviceAddedCallback(self, dom, devAlias, cbData):171        """Dispatches event to python user domain device added event callbacks172        """173        cb = cbData["cb"]174        opaque = cbData["opaque"]175        cb(self, virDomain(self, _obj=dom), devAlias, opaque)176        return 0177    def domainEventDeregisterAny(self, callbackID):178        """Removes a Domain Event Callback. De-registering for a179           domain callback will disable delivery of this event type """180        try:181            ret = libvirtmod.virConnectDomainEventDeregisterAny(self._o, callbackID)182            if ret == -1: raise libvirtError ('virConnectDomainEventDeregisterAny() failed', conn=self)183            del self.domainEventCallbackID[callbackID]184        except AttributeError:185            pass186    def _dispatchNetworkEventLifecycleCallback(self, net, event, detail, cbData):187        """Dispatches events to python user network lifecycle event callbacks188        """189        cb = cbData["cb"]190        opaque = cbData["opaque"]191        cb(self, virNetwork(self, _obj=net), event, detail, opaque)192        return 0193    def networkEventDeregisterAny(self, callbackID):194        """Removes a Network Event Callback. De-registering for a195           network callback will disable delivery of this event type"""196        try:197            ret = libvirtmod.virConnectNetworkEventDeregisterAny(self._o, callbackID)198            if ret == -1: raise libvirtError ('virConnectNetworkEventDeregisterAny() failed', conn=self)199            del self.networkEventCallbackID[callbackID]200        except AttributeError:201            pass202    def networkEventRegisterAny(self, net, eventID, cb, opaque):203        """Adds a Network Event Callback. Registering for a network204           callback will enable delivery of the events"""205        if not hasattr(self, 'networkEventCallbackID'):206            self.networkEventCallbackID = {}207        cbData = { "cb": cb, "conn": self, "opaque": opaque }208        if net is None:209            ret = libvirtmod.virConnectNetworkEventRegisterAny(self._o, None, eventID, cbData)210        else:211            ret = libvirtmod.virConnectNetworkEventRegisterAny(self._o, net._o, eventID, cbData)212        if ret == -1:213            raise libvirtError ('virConnectNetworkEventRegisterAny() failed', conn=self)214        self.networkEventCallbackID[ret] = opaque215        return ret216    def domainEventRegisterAny(self, dom, eventID, cb, opaque):217        """Adds a Domain Event Callback. Registering for a domain218           callback will enable delivery of the events """219        if not hasattr(self, 'domainEventCallbackID'):220            self.domainEventCallbackID = {}221        cbData = { "cb": cb, "conn": self, "opaque": opaque }222        if dom is None:223            ret = libvirtmod.virConnectDomainEventRegisterAny(self._o, None, eventID, cbData)224        else:225            ret = libvirtmod.virConnectDomainEventRegisterAny(self._o, dom._o, eventID, cbData)226        if ret == -1:227            raise libvirtError ('virConnectDomainEventRegisterAny() failed', conn=self)228        self.domainEventCallbackID[ret] = opaque229        return ret230    def listAllDomains(self, flags=0):231        """List all domains and returns a list of domain objects"""232        ret = libvirtmod.virConnectListAllDomains(self._o, flags)233        if ret is None:234            raise libvirtError("virConnectListAllDomains() failed", conn=self)235        retlist = list()236        for domptr in ret:237            retlist.append(virDomain(self, _obj=domptr))238        return retlist239    def listAllStoragePools(self, flags=0):240        """Returns a list of storage pool objects"""241        ret = libvirtmod.virConnectListAllStoragePools(self._o, flags)242        if ret is None:243            raise libvirtError("virConnectListAllStoragePools() failed", conn=self)244        retlist = list()245        for poolptr in ret:246            retlist.append(virStoragePool(self, _obj=poolptr))247        return retlist248    def listAllNetworks(self, flags=0):249        """Returns a list of network objects"""250        ret = libvirtmod.virConnectListAllNetworks(self._o, flags)251        if ret is None:252            raise libvirtError("virConnectListAllNetworks() failed", conn=self)253        retlist = list()254        for netptr in ret:255            retlist.append(virNetwork(self, _obj=netptr))256        return retlist257    def listAllInterfaces(self, flags=0):258        """Returns a list of interface objects"""259        ret = libvirtmod.virConnectListAllInterfaces(self._o, flags)260        if ret is None:261            raise libvirtError("virConnectListAllInterfaces() failed", conn=self)262        retlist = list()263        for ifaceptr in ret:264            retlist.append(virInterface(self, _obj=ifaceptr))265        return retlist266    def listAllDevices(self, flags=0):267        """Returns a list of host node device objects"""268        ret = libvirtmod.virConnectListAllNodeDevices(self._o, flags)269        if ret is None:270            raise libvirtError("virConnectListAllNodeDevices() failed", conn=self)271        retlist = list()272        for devptr in ret:273            retlist.append(virNodeDevice(self, _obj=devptr))274        return retlist275    def listAllNWFilters(self, flags=0):276        """Returns a list of network filter objects"""277        ret = libvirtmod.virConnectListAllNWFilters(self._o, flags)278        if ret is None:279            raise libvirtError("virConnectListAllNWFilters() failed", conn=self)280        retlist = list()281        for filter_ptr in ret:282            retlist.append(virNWFilter(self, _obj=filter_ptr))283        return retlist284    def listAllSecrets(self, flags=0):285        """Returns a list of secret objects"""286        ret = libvirtmod.virConnectListAllSecrets(self._o, flags)287        if ret is None:288            raise libvirtError("virConnectListAllSecrets() failed", conn=self)289        retlist = list()290        for secret_ptr in ret:291            retlist.append(virSecret(self, _obj=secret_ptr))292        return retlist293    def _dispatchCloseCallback(self, reason, cbData):294        """Dispatches events to python user close callback"""295        cb = cbData["cb"]296        opaque = cbData["opaque"]297        cb(self, reason, opaque)298        return 0299    def unregisterCloseCallback(self):300        """Removes a close event callback"""301        ret = libvirtmod.virConnectUnregisterCloseCallback(self._o)302        if ret == -1: raise libvirtError ('virConnectUnregisterCloseCallback() failed', conn=self)303    def registerCloseCallback(self, cb, opaque):304        """Adds a close event callback, providing a notification305         when a connection fails / closes"""306        cbData = { "cb": cb, "conn": self, "opaque": opaque }307        ret = libvirtmod.virConnectRegisterCloseCallback(self._o, cbData)308        if ret == -1:309            raise libvirtError ('virConnectRegisterCloseCallback() failed', conn=self)310        return ret311    def createXMLWithFiles(self, xmlDesc, files, flags=0):...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
