How to use NewConfig method of csv Package

Best K6 code snippet using csv.NewConfig

config.go

Source:config.go Github

copy

Full Screen

1// Copyright (c) 2021 Damien Stuart. All rights reserved.2//3// Use of this source code is governed by the MIT License that can be found4// in the LICENSE file.5//6package main7import (8 "flag"9 "fmt"10 "io/ioutil"11 "os"12 "path/filepath"13 "regexp"14 "strconv"15 "strings"16 "github.com/creasty/defaults"17 g "github.com/gosnmp/gosnmp"18 "gopkg.in/yaml.v2"19)20/* ===========================================================21Notes on YAML configuration processing:22 * Variables that start with capital letters are processed (at least, for JSON)23 * Renaming of variables for the YAML file is done with the `yaml:` directives24 * Renamed variables *must* be in quotes to be recognized correctly (at least for underscores)25 * Default values are being applied with the creasty/defaults module26 * Non-basic types and classes can't be instantiated directly (eg g.SHA)27 * Configuration data structures have two sets of variables: text and usable28 * Per convention, the text versions start with uppercase, the usable ones start lowercase29 * Filter lines are very problematic for YAML30 * some characters (I'm looking at you ':' -- also regex) cause YAML to barf31 * using a more YAML-like structure will eat up huge chunks of configuration lines32 eg33 * * * * * ^1\.3\.6\.1\.4.1\.546\.1\.1 break34 vs35 - snmpversions: *36 source_ip: *37 agent_address: *38 ...39 ===========================================================40*/41type v3Params struct {42 MsgFlags string `default:"NoAuthNoPriv" yaml:"msg_flags"`43 msgFlags g.SnmpV3MsgFlags `default:"g.NoAuthNoPriv"`44 Username string `default:"XXv3Username" yaml:"username"`45 AuthProto string `default:"NoAuth" yaml:"auth_protocol"`46 authProto g.SnmpV3AuthProtocol `default:"g.NoAuth"`47 AuthPassword string `default:"XXv3authPass" yaml:"auth_password"`48 PrivacyProto string `default:"NoPriv" yaml:"privacy_protocol"`49 privacyProto g.SnmpV3PrivProtocol `default:"g.NoPriv"`50 PrivacyPassword string `default:"XXv3Pass" yaml:"privacy_password"`51}52type ipSet map[string]bool53type trapexConfig struct {54 teConfigured bool55 runLogFile string56 configFile string57 General struct {58 Hostname string `yaml:"hostname"`59 ListenAddr string `default:"0.0.0.0" yaml:"listen_address"`60 ListenPort string `default:"162" yaml:"listen_port"`61 IgnoreVersions []string `default:"[]" yaml:"ignore_versions"`62 ignoreVersions []g.SnmpVersion `default:"[]"`63 PrometheusIp string `default:"0.0.0.0" yaml:"prometheus_ip"`64 PrometheusPort string `default:"80" yaml:"prometheus_port"`65 PrometheusEndpoint string `default:"metrics" yaml:"prometheus_endpoint"`66 }67 Logging struct {68 Level string `default:"info" yaml:"level"`69 LogMaxSize int `default:"1024" yaml:"log_size_max"`70 LogMaxBackups int `default:"7" yaml:"log_backups_max"`71 LogMaxAge int `yaml:"log_age_max"`72 LogCompress bool `default:"false" yaml:"compress_rotated_logs"`73 }74 V3Params v3Params `yaml:"snmpv3"`75 IpSets []map[string][]string `default:"{}" yaml:"ip_sets"`76 ipSets map[string]ipSet `default:"{}"`77 RawFilters []string `default:"[]" yaml:"filters"`78 filters []trapexFilter79}80type trapexCommandLine struct {81 configFile string82 bindAddr string83 listenPort string84 debugMode bool85}86// Global vars87//88var teConfig *trapexConfig89var teCmdLine trapexCommandLine90var ipRe = regexp.MustCompile(`^(?:\d{1,3}\.){3}\d{1,3}$`)91func showUsage() {92 usageText := `93Usage: trapex [-h] [-c <config_file>] [-b <bind_ip>] [-p <listen_port>]94 [-d] [-v]95 -h - Show this help message and exit.96 -c - Override the location of the trapex configuration file.97 -b - Override the bind IP address on which to listen for incoming traps.98 -p - Override the UDP port on which to listen for incoming traps.99 -d - Enable debug mode (note: produces very verbose runtime output).100 -v - Print the version of trapex and exit.101`102 fmt.Println(usageText)103}104func processCommandLine() {105 flag.Usage = showUsage106 c := flag.String("c", "/etc/trapex.conf", "")107 b := flag.String("b", "", "")108 p := flag.String("p", "", "")109 d := flag.Bool("d", false, "")110 showVersion := flag.Bool("v", false, "")111 flag.Parse()112 if *showVersion {113 fmt.Printf("This is trapex version %s\n", myVersion)114 os.Exit(0)115 }116 teCmdLine.configFile = *c117 teCmdLine.bindAddr = *b118 teCmdLine.listenPort = *p119 teCmdLine.debugMode = *d120}121// loadConfig122// Load a YAML file with configuration, and create a new object123func loadConfig(config_file string, newConfig *trapexConfig) error {124 defaults.Set(newConfig)125 newConfig.ipSets = make(map[string]ipSet)126 filename, _ := filepath.Abs(config_file)127 yamlFile, err := ioutil.ReadFile(filename)128 if err != nil {129 return err130 }131 err = yaml.UnmarshalStrict(yamlFile, newConfig)132 if err != nil {133 return err134 }135 return nil136}137func applyCliOverrides(newConfig *trapexConfig) {138 // Override the listen address:port if they were specified on the139 // command line. If not and the listener values were not set in140 // the config file, fallback to defaults.141 if teCmdLine.bindAddr != "" {142 newConfig.General.ListenAddr = teCmdLine.bindAddr143 }144 if teCmdLine.listenPort != "" {145 newConfig.General.ListenPort = teCmdLine.listenPort146 }147 if teCmdLine.debugMode {148 newConfig.Logging.Level = "debug"149 }150 if newConfig.General.Hostname == "" {151 myName, err := os.Hostname()152 if err != nil {153 newConfig.General.Hostname = "_undefined"154 } else {155 newConfig.General.Hostname = myName156 }157 }158}159func getConfig() error {160 var operation string161 // If this is a reconfig close any current handles162 if teConfig != nil && teConfig.teConfigured {163 operation = "Reloading "164 } else {165 operation = "Loading "166 }167 logger.Info().Str("version", myVersion).Str("configuration_file", teCmdLine.configFile).Msg(operation + "configuration for trapex")168 var newConfig trapexConfig169 err := loadConfig(teCmdLine.configFile, &newConfig)170 if err != nil {171 return err172 }173 applyCliOverrides(&newConfig)174 if err = validateIgnoreVersions(&newConfig); err != nil {175 return err176 }177 if err = validateSnmpV3Args(&newConfig); err != nil {178 return err179 }180 if err = processIpSets(&newConfig); err != nil {181 return err182 }183 if err = processFilters(&newConfig); err != nil {184 return err185 }186 // If this is a reconfigure, close the old handles here187 if teConfig != nil && teConfig.teConfigured {188 closeTrapexHandles()189 }190 // Set our global config pointer to this configuration191 newConfig.teConfigured = true192 teConfig = &newConfig193 return nil194}195func validateIgnoreVersions(newConfig *trapexConfig) error {196 var ignorev1, ignorev2c, ignorev3 bool = false, false, false197 for _, candidate := range newConfig.General.IgnoreVersions {198 switch strings.ToLower(candidate) {199 case "v1", "1":200 if ignorev1 != true {201 newConfig.General.ignoreVersions = append(newConfig.General.ignoreVersions, g.Version1)202 ignorev1 = true203 }204 case "v2c", "2c", "2":205 if ignorev2c != true {206 newConfig.General.ignoreVersions = append(newConfig.General.ignoreVersions, g.Version2c)207 ignorev2c = true208 }209 case "v3", "3":210 if ignorev3 != true {211 newConfig.General.ignoreVersions = append(newConfig.General.ignoreVersions, g.Version3)212 ignorev3 = true213 }214 default:215 return fmt.Errorf("unsupported or invalid value (%s) for general:ignore_version", candidate)216 }217 }218 if len(newConfig.General.ignoreVersions) > 2 {219 return fmt.Errorf("All three SNMP versions are ignored -- there will be no traps to process")220 }221 return nil222}223func validateSnmpV3Args(newConfig *trapexConfig) error {224 switch strings.ToLower(newConfig.V3Params.MsgFlags) {225 case "noauthnopriv":226 newConfig.V3Params.msgFlags = g.NoAuthNoPriv227 case "authnopriv":228 newConfig.V3Params.msgFlags = g.AuthNoPriv229 case "authpriv":230 newConfig.V3Params.msgFlags = g.AuthPriv231 default:232 return fmt.Errorf("unsupported or invalid value (%s) for snmpv3:msg_flags", newConfig.V3Params.MsgFlags)233 }234 switch strings.ToLower(newConfig.V3Params.AuthProto) {235 // AES is *NOT* supported236 case "noauth":237 newConfig.V3Params.authProto = g.NoAuth238 case "sha":239 newConfig.V3Params.authProto = g.SHA240 case "md5":241 newConfig.V3Params.authProto = g.MD5242 default:243 return fmt.Errorf("invalid value for snmpv3:auth_protocol: %s", newConfig.V3Params.AuthProto)244 }245 switch strings.ToLower(newConfig.V3Params.PrivacyProto) {246 case "nopriv":247 newConfig.V3Params.privacyProto = g.NoPriv248 case "aes":249 newConfig.V3Params.privacyProto = g.AES250 case "des":251 newConfig.V3Params.privacyProto = g.DES252 default:253 return fmt.Errorf("invalid value for snmpv3:privacy_protocol: %s", newConfig.V3Params.PrivacyProto)254 }255 if (newConfig.V3Params.msgFlags&g.AuthPriv) == 1 && newConfig.V3Params.authProto < 2 {256 return fmt.Errorf("v3 config error: no auth protocol set when snmpv3:msg_flags specifies an Auth mode")257 }258 if newConfig.V3Params.msgFlags == g.AuthPriv && newConfig.V3Params.privacyProto < 2 {259 return fmt.Errorf("v3 config error: no privacy protocol mode set when snmpv3:msg_flags specifies an AuthPriv mode")260 }261 return nil262}263func processIpSets(newConfig *trapexConfig) error {264 for _, stanza := range newConfig.IpSets {265 for ipsName, ips := range stanza {266 logger.Info().Str("ipset", ipsName).Msg("Loading IpSet")267 newConfig.ipSets[ipsName] = make(map[string]bool)268 for _, ip := range ips {269 if ipRe.MatchString(ip) {270 newConfig.ipSets[ipsName][ip] = true271 logger.Debug().Str("ipset", ipsName).Str("ip", ip).Msg("Adding IP to IpSet")272 } else {273 return fmt.Errorf("Invalid IP address (%s) in ipset: %s", ip, ipsName)274 }275 }276 }277 }278 return nil279}280func processFilters(newConfig *trapexConfig) error {281 for lineNumber, filter_line := range newConfig.RawFilters {282 logger.Debug().Str("filter", filter_line).Int("line_number", lineNumber).Msg("Examining filter")283 if err := processFilterLine(strings.Fields(filter_line), newConfig, lineNumber); err != nil {284 return err285 }286 }287 return nil288}289// processFilterLine parses a "filter" line and sets290// the appropriate values in a corresponding trapexFilter struct.291//292func processFilterLine(f []string, newConfig *trapexConfig, lineNumber int) error {293 var err error294 if len(f) < 7 {295 return fmt.Errorf("not enough fields in filter line(%v): %s", lineNumber, "filter "+strings.Join(f, " "))296 }297 // Process the filter criteria298 //299 filter := trapexFilter{}300 if strings.HasPrefix(strings.Join(f, " "), "* * * * * *") {301 filter.matchAll = true302 } else {303 fObj := filterObj{}304 // Construct the filter criteria305 for i, fi := range f[:6] {306 if fi == "*" {307 continue308 }309 fObj.filterItem = i310 if i == 0 {311 switch strings.ToLower(fi) {312 case "v1", "1":313 fObj.filterValue = g.Version1314 case "v2c", "2c", "2":315 fObj.filterValue = g.Version2c316 case "v3", "3":317 fObj.filterValue = g.Version3318 default:319 return fmt.Errorf("unsupported or invalid SNMP version (%s) on line %v for filter: %s", fi, lineNumber, f)320 }321 fObj.filterType = parseTypeInt // Just because we should set this to something.322 } else if i == 1 || i == 2 { // Either of the first 2 is an IP address type323 if strings.HasPrefix(fi, "ipset:") { // If starts with a "ipset:"" it's an IP set324 fObj.filterType = parseTypeIPSet325 if _, ok := newConfig.ipSets[fi[6:]]; ok {326 fObj.filterValue = fi[6:]327 } else {328 return fmt.Errorf("Invalid ipset name specified on line %v: %s: %s", lineNumber, fi, f)329 }330 } else if strings.HasPrefix(fi, "/") { // If starts with a "/", it's a regex331 fObj.filterType = parseTypeRegex332 fObj.filterValue, err = regexp.Compile(fi[1:])333 if err != nil {334 return fmt.Errorf("unable to compile regexp for IP on line %v: %s: %s\n%s", lineNumber, fi, err, f)335 }336 } else if strings.Contains(fi, "/") {337 fObj.filterType = parseTypeCIDR338 fObj.filterValue, err = newNetwork(fi)339 if err != nil {340 return fmt.Errorf("invalid IP/CIDR at line %v: %s, %s", lineNumber, fi, f)341 }342 } else {343 fObj.filterType = parseTypeString344 fObj.filterValue = fi345 }346 } else if i > 2 && i < 5 { // Generic and Specific type347 val, e := strconv.Atoi(fi)348 if e != nil {349 return fmt.Errorf("invalid integer value at line %v: %s: %s", lineNumber, fi, e)350 }351 fObj.filterType = parseTypeInt352 fObj.filterValue = val353 } else { // The enterprise OID354 fObj.filterType = parseTypeRegex355 fObj.filterValue, err = regexp.Compile(fi)356 if err != nil {357 return fmt.Errorf("unable to compile regexp at line %v for OID: %s: %s", lineNumber, fi, err)358 }359 }360 filter.filterItems = append(filter.filterItems, fObj)361 }362 }363 // Process the filter action364 //365 var actionArg string366 var breakAfter bool367 if len(f) > 8 && f[8] == "break" {368 breakAfter = true369 } else {370 breakAfter = false371 }372 var action = f[6]373 if len(f) > 7 {374 actionArg = f[7]375 }376 switch action {377 case "break", "drop":378 filter.actionType = actionBreak379 case "nat":380 filter.actionType = actionNat381 if actionArg == "" {382 return fmt.Errorf("missing nat argument at line %v", lineNumber)383 }384 filter.actionArg = actionArg385 case "forward":386 if breakAfter {387 filter.actionType = actionForwardBreak388 } else {389 filter.actionType = actionForward390 }391 forwarder := trapForwarder{}392 if err := forwarder.initAction(actionArg); err != nil {393 return err394 }395 filter.action = &forwarder396 case "log":397 if breakAfter {398 filter.actionType = actionLogBreak399 } else {400 filter.actionType = actionLog401 }402 logger := trapLogger{}403 if err := logger.initAction(actionArg, newConfig); err != nil {404 return err405 }406 filter.action = &logger407 case "csv":408 if breakAfter {409 filter.actionType = actionCsvBreak410 } else {411 filter.actionType = actionCsv412 }413 csvLogger := trapCsvLogger{}414 if err := csvLogger.initAction(actionArg, newConfig); err != nil {415 return err416 }417 filter.action = &csvLogger418 default:419 return fmt.Errorf("unknown action: %s at line %v", action, lineNumber)420 }421 newConfig.filters = append(newConfig.filters, filter)422 return nil423}424func closeTrapexHandles() {425 for _, f := range teConfig.filters {426 if f.actionType == actionForward || f.actionType == actionForwardBreak {427 f.action.(*trapForwarder).close()428 }429 if f.actionType == actionLog || f.actionType == actionLogBreak {430 f.action.(*trapLogger).close()431 }432 if f.actionType == actionCsv || f.actionType == actionCsvBreak {433 f.action.(*trapCsvLogger).close()434 }435 }436}...

Full Screen

Full Screen

generate_snapshot.go

Source:generate_snapshot.go Github

copy

Full Screen

1package cmd2import (3 "context"4 "encoding/csv"5 "fmt"6 "github.com/jinzhu/copier"7 "github.com/niccoloCastelli/orderbooks/common"8 "github.com/niccoloCastelli/orderbooks/data_formats/cryptotick"9 "github.com/niccoloCastelli/orderbooks/orderbook"10 "github.com/niccoloCastelli/orderbooks/workers/backends/csv_backend"11 "github.com/pkg/errors"12 "github.com/remeh/sizedwaitgroup"13 "github.com/rs/zerolog"14 "github.com/rs/zerolog/log"15 "github.com/spf13/afero"16 "github.com/spf13/cobra"17 "io/ioutil"18 "os"19 "path"20 "strings"21 "time"22)23const timeOffsetDefaultLayout = "2006-01-02T15:04:05Z0700"24const dateOnlyLayout = "2006-01-02"25type reconstructionConfig struct {26 TimeOffsetStr string27 TimeOffset time.Time28 SnapshotInterval time.Duration29 ExchangeName string30 TimeOffsetLayout string31 OutPath string32 NumWorkers int33 MaxSnapshotSize int34 PriceTicks bool35 DateUntil *time.Time36}37func (c *reconstructionConfig) ParseTimeOffset() (err error) {38 c.TimeOffset = time.Time{}39 if c.TimeOffsetStr != "" {40 if c.TimeOffset, err = time.Parse(c.TimeOffsetLayout, c.TimeOffsetStr); err != nil {41 return err42 }43 }44 return nil45}46func initReconstructionCmd() {47 dateUntilStr := ""48 conf := reconstructionConfig{49 TimeOffsetLayout: timeOffsetDefaultLayout,50 ExchangeName: "cryptotick",51 OutPath: "./storage",52 NumWorkers: 1,53 MaxSnapshotSize: 1000,54 }55 // reconstructionCmd represents the reconstruction command56 var reconstructionCmd = &cobra.Command{57 Use: "generate_snapshot",58 Short: "Generate snapshots from cryptotick event files",59 Long: `Generate a snapshot from cryptotick CSV event files. generate_snapshot [PATH (data snapshot)] [FILENAME_1 FILENAME_2 ...FILENAME_n]`,60 Run: func(cmd *cobra.Command, args []string) {61 var (62 logger = log.With().Str("cmd", "generate_snapshot").Str("out_path", conf.OutPath).Logger()63 )64 if dateUntilStr != "" {65 if dateUntil, err := time.Parse(dateOnlyLayout, dateUntilStr); err == nil {66 conf.DateUntil = &dateUntil67 logger.Info().Time("process_until", *conf.DateUntil).Send()68 } else {69 logger.Err(err).Msg("time parse error")70 }71 }72 switch len(args) {73 case 1:74 logger.Info().Str("mode", "single").Send()75 if err := conf.ParseTimeOffset(); err != nil {76 logger.Fatal().Err(err).Send()77 }78 if err := snapshotReconstruction(args[0], logger, conf); err != nil {79 logger.Err(err).Send()80 }81 case 2:82 if err := bulkSnapshotReconstruction(args[0], args[1], logger, conf); err != nil {83 logger.Err(err).Send()84 }85 default:86 logger.Fatal().Msg("no input file")87 }88 logger.Info().Msg("done")89 },90 }91 reconstructionCmd.PersistentFlags().StringVar(&dateUntilStr, "date_until", "", "Generate snapshots until this date (leave empty to generate snapshots until end of files)")92 reconstructionCmd.PersistentFlags().BoolVar(&conf.PriceTicks, "price_ticks", false, "Generate snapshots every [interval] price changes")93 reconstructionCmd.PersistentFlags().IntVar(&conf.MaxSnapshotSize, "max_snapshot_size", 1000, "Max orders per snapshot (per side, default: 1000)")94 reconstructionCmd.PersistentFlags().IntVar(&conf.NumWorkers, "num_workers", 8, "Worker processes (default: 8)")95 reconstructionCmd.PersistentFlags().StringVar(&conf.OutPath, "out_path", "./storage", "Generated snapthots path (--out_path /my/storage/path)")96 reconstructionCmd.PersistentFlags().StringVar(&conf.ExchangeName, "exchange_name", "cryptotick", "Exchange name (--exchange_name bitstamp)")97 reconstructionCmd.PersistentFlags().StringVar(&conf.TimeOffsetStr, "time_offset", "", "Offset time from first shapshot (--time_offset 2019-12-30T00:00:00)")98 reconstructionCmd.PersistentFlags().StringVar(&conf.TimeOffsetLayout, "time_offset_layout", timeOffsetDefaultLayout, "Time offset date layout (--time_offset_layout 2006-01-02T15:04:05Z0700)")99 reconstructionCmd.PersistentFlags().DurationVar(&conf.SnapshotInterval, "interval", time.Second*30, "30s for time ticks, ")100 rootCmd.AddCommand(reconstructionCmd)101}102func bulkSnapshotReconstruction(filesPath string, fileName string, logger zerolog.Logger, config reconstructionConfig) error {103 dirContent, err := ioutil.ReadDir(filesPath)104 var wg = sizedwaitgroup.New(config.NumWorkers)105 if err != nil {106 return err107 }108 numDirs := len(dirContent)109 logger.Info().Str("mode", "bulk").Int("workers", config.NumWorkers).Int("snapshot_size", config.MaxSnapshotSize).Send()110 startedTasks := 0111 completedTasks := 0112 for i, f := range dirContent {113 var (114 err error115 )116 if !f.IsDir() {117 continue118 }119 absPath := path.Join(filesPath, f.Name(), fileName)120 fLogger := logger.With().Bool("bulk", true).Int("index", i).Int("dir_count", numDirs).Int("batch", i/config.NumWorkers).Logger()121 fileInfo, err := os.Stat(absPath)122 if err != nil {123 fLogger.Debug().Err(err).Msg("skip fileinfo err")124 continue125 }126 if fileInfo.IsDir() {127 fLogger.Error().Msg("is directory")128 }129 newConfig := reconstructionConfig{}130 if err := copier.Copy(&newConfig, &config); err != nil {131 logger.Fatal().Err(err).Send()132 }133 newConfig.TimeOffsetStr = f.Name()134 newConfig.TimeOffset = time.Time{}135 if err = newConfig.ParseTimeOffset(); err != nil {136 logger.Fatal().Err(err).Send()137 }138 if newConfig.DateUntil != nil && !newConfig.TimeOffset.IsZero() && newConfig.TimeOffset.Sub(*newConfig.DateUntil) >= time.Hour*24 {139 logger.Info().Time("time_offset time", newConfig.TimeOffset).Msg("time limit reached")140 break141 }142 wg.Add()143 startedTasks++144 go func(absPath string, fLogger zerolog.Logger, newConfig reconstructionConfig, wg *sizedwaitgroup.SizedWaitGroup) {145 defer wg.Done()146 if err := snapshotReconstruction(absPath, fLogger, newConfig); err != nil {147 fLogger.Fatal().Err(err).Send()148 }149 completedTasks++150 }(absPath, fLogger, newConfig, &wg)151 }152 logger.Info().Int("started", startedTasks).Int("completed", completedTasks).Msg("waiting... ")153 wg.Wait()154 return nil155}156func snapshotReconstruction(eventsFile string, logger zerolog.Logger, config reconstructionConfig) error {157 var (158 snapshotCount int159 snapshotChan = make(chan common.Snapshot, 32)160 sw = csv_backend.NewSnapshotWriter(afero.NewBasePathFs(afero.NewOsFs(), config.OutPath), snapshotChan, true)161 ctx, cancelCtx = context.WithCancel(context.Background())162 )163 _, err := os.Stat(eventsFile)164 if err != nil {165 logger.Fatal().Err(err).Send()166 }167 csvFile, err := os.Open(eventsFile)168 if err != nil {169 logger.Fatal().Err(err).Send()170 }171 if err := sw.Run(ctx); err != nil {172 logger.Fatal().Err(errors.Wrap(errors.Cause(err), "writer start error")).Send()173 }174 logger = logger.With().Str("file", eventsFile).Logger()175 logger.Debug().Msg("open csv file ok")176 logger.Info().Str("interval", fmt.Sprintf("%v", config.SnapshotInterval)).Str("offset", config.TimeOffset.String()).Msg("open csv file ok")177 defer csvFile.Close()178 reader := csv.NewReader(csvFile)179 reader.Comma = ';'180 ob := orderbook.NewAddModeOrderBook(logger, nil).LimitSnapshotSize(config.MaxSnapshotSize).SetTimeOffset(config.TimeOffset)181 previousUpdate := ob.LastUpdate()182 lastUpdateType := cryptotick.UpdateModeSnapshot183 resetPerformed := false184 priceUpdated := false185 lastAsk, lastBid := ob.BestPrices()186 for i := 0; ; i++ {187 resetPerformed = false188 priceUpdated = false189 csvLine, err := reader.Read()190 if err != nil {191 logger.Debug().Err(err)192 break193 }194 if i == 0 {195 logger.Debug().Str("headers", strings.Join(csvLine, ", ")).Send()196 continue197 }198 parsedRow, err := cryptotick.ParseCryptotyickRow(csvLine)199 if err != nil {200 logger.Error().Err(err).Send()201 break202 }203 if parsedRow.UpdateType == cryptotick.UpdateModeSnapshot && lastUpdateType != cryptotick.UpdateModeSnapshot {204 ob = orderbook.NewAddModeOrderBook(logger, nil).LimitSnapshotSize(config.MaxSnapshotSize).SetTimeOffset(config.TimeOffset)205 resetPerformed = true206 }207 ob.Update(parsedRow.ToOrder(), false)208 lastUpdateType = parsedRow.UpdateType209 if previousUpdate.IsZero() {210 previousUpdate = ob.LastUpdate()211 continue212 }213 lastUpdate := ob.LastUpdate()214 if parsedRow.UpdateType != cryptotick.UpdateModeSnapshot {215 bestAsk, bestBid := ob.BestPrices()216 if bestAsk != lastAsk || bestBid != lastBid {217 priceUpdated = true218 }219 }220 if !resetPerformed && ((!config.PriceTicks && lastUpdate.Sub(previousUpdate) > config.SnapshotInterval) || (config.PriceTicks && priceUpdated)) {221 snapshot := ob.GetSnapshot()222 snapshot.Exchange = "cryptotick_bitstamp" //TODO: parametrizzare223 snapshot.Pair = common.Pair{Base: "BTC", Quote: "USD"}224 snapshotChan <- snapshot225 snapshotCount++226 logger.Debug().Int("snapshot_count", snapshotCount).Str("last_update", lastUpdate.String()).Msg("snapshot")227 previousUpdate = ob.LastUpdate()228 }229 lastAsk, lastBid = ob.BestPrices()230 }231 cancelCtx()232 if err := sw.Close(); err != nil {233 return err234 }235 logger.Info().Msg("file OK")236 return nil237}238func init() {239 initReconstructionCmd()240 // Here you will define your flags and configuration settings.241 // Cobra supports Persistent Flags which will work for this command242 // and all subcommands, e.g.:243 // reconstructionCmd.PersistentFlags().String("foo", "", "A help for foo")244 // Cobra supports local flags which will only run when this command245 // is called directly, e.g.:246 // reconstructionCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")247}...

Full Screen

Full Screen

csv_test.go

Source:csv_test.go Github

copy

Full Screen

1package csv2import (3 "path/filepath"4 "reflect"5 "testing"6 "time"7 "github.com/xh3b4sd/wafer/service/informer"8 runtimeconfigdir "github.com/xh3b4sd/wafer/service/informer/csv/runtime/config/dir"9 runtimeconfigfile "github.com/xh3b4sd/wafer/service/informer/csv/runtime/config/file"10 configfileheader "github.com/xh3b4sd/wafer/service/informer/csv/runtime/config/file/header"11)12func Test_Informer_File_Prices(t *testing.T) {13 path, err := filepath.Abs("./fixtures/file/001.csv")14 if err != nil {15 t.Fatal("expected", nil, "got", err)16 }17 testCases := []struct {18 File runtimeconfigfile.File19 Expected map[int]informer.Price20 }{21 {22 File: runtimeconfigfile.File{23 Header: configfileheader.Header{24 Buy: 9,25 Ignore: true,26 Sell: 10,27 Time: 12,28 },29 Path: path,30 },31 Expected: map[int]informer.Price{32 0: {33 Buy: 797.4000000000,34 Sell: 797.0000000000,35 Time: time.Unix(1391212802, 0),36 },37 4: {38 Buy: 798.8900000000,39 Sell: 797.0000000000,40 Time: time.Unix(1391213041, 0),41 },42 9: {43 Buy: 796.9000000000,44 Sell: 793.0000000000,45 Time: time.Unix(1391213342, 0),46 },47 },48 },49 }50 for i, testCase := range testCases {51 newConfig := DefaultConfig()52 newConfig.File = testCase.File53 newInformer, err := New(newConfig)54 if err != nil {55 t.Fatal("case", i+1, "expected", nil, "got", err)56 }57 // The file configuration for the CSV informer must result in one channel in58 // the channel list, because there is only one CSV file to parse.59 prices := newInformer.Prices()60 if len(prices) != 1 {61 t.Fatal("case", i+1, "expected", 1, "got", len(prices))62 }63 var j int64 for price := range prices[0] {65 for k, p := range testCase.Expected {66 if j != k {67 continue68 }69 if !reflect.DeepEqual(price, p) {70 t.Fatal("case", i+1, "expected", p, "got", price)71 }72 }73 j++74 }75 }76}77// Test_Informer_File_Prices_MultipleCalls makes sure Informer.Prices can be78// called mutliple times, which means the actual price data is cached and always79// available in memory.80func Test_Informer_File_Prices_MultipleCalls(t *testing.T) {81 path, err := filepath.Abs("./fixtures/file/001.csv")82 if err != nil {83 t.Fatal("expected", nil, "got", err)84 }85 newConfig := DefaultConfig()86 newConfig.File.Header.Buy = 987 newConfig.File.Header.Ignore = true88 newConfig.File.Header.Sell = 1089 newConfig.File.Header.Time = 1290 newConfig.File.Path = path91 newInformer, err := New(newConfig)92 if err != nil {93 t.Fatal("expected", nil, "got", err)94 }95 // The file configuration for the CSV informer must result in one channel in96 // the channel list, because there is only one CSV file to parse.97 prices1 := newInformer.Prices()98 if len(prices1) != 1 {99 t.Fatal("expected", 1, "got", len(prices1))100 }101 p1 := <-prices1[0]102 prices2 := newInformer.Prices()103 if len(prices2) != 1 {104 t.Fatal("expected", 1, "got", len(prices2))105 }106 p2 := <-prices2[0]107 prices3 := newInformer.Prices()108 if len(prices3) != 1 {109 t.Fatal("expected", 1, "got", len(prices3))110 }111 p3 := <-prices3[0]112 if !reflect.DeepEqual(p1, p2) {113 t.Fatal("expected", true, "got", false)114 }115 if !reflect.DeepEqual(p1, p3) {116 t.Fatal("expected", true, "got", false)117 }118 if !reflect.DeepEqual(p2, p3) {119 t.Fatal("expected", true, "got", false)120 }121}122func Test_Informer_Dir_Prices(t *testing.T) {123 path, err := filepath.Abs("./fixtures/dir/")124 if err != nil {125 t.Fatal("expected", nil, "got", err)126 }127 testCases := []struct {128 Dir runtimeconfigdir.Dir129 Expected []map[int]informer.Price130 }{131 {132 Dir: runtimeconfigdir.Dir{133 Path: path,134 },135 Expected: []map[int]informer.Price{136 {137 0: {138 Buy: 797.4000000000,139 Sell: 797.0000000000,140 Time: time.Unix(1391212802, 0),141 },142 4: {143 Buy: 798.8900000000,144 Sell: 797.0000000000,145 Time: time.Unix(1391213041, 0),146 },147 9: {148 Buy: 796.9000000000,149 Sell: 793.0000000000,150 Time: time.Unix(1391213342, 0),151 },152 },153 {154 0: {155 Buy: 797.4000000000,156 Sell: 797.0000000000,157 Time: time.Unix(1391212802, 0),158 },159 4: {160 Buy: 798.8900000000,161 Sell: 797.0000000000,162 Time: time.Unix(1391213041, 0),163 },164 9: {165 Buy: 796.9000000000,166 Sell: 793.0000000000,167 Time: time.Unix(1391213342, 0),168 },169 },170 },171 },172 }173 for i, testCase := range testCases {174 newConfig := DefaultConfig()175 newConfig.Dir = testCase.Dir176 newInformer, err := New(newConfig)177 if err != nil {178 t.Fatal("case", i+1, "expected", nil, "got", err)179 }180 // The dir configuration for the CSV informer must result in two channels in181 // the channel list, because there are two CSV directories to parse.182 prices := newInformer.Prices()183 if len(prices) != 2 {184 t.Fatal("case", i+1, "expected", 2, "got", len(prices))185 }186 for d, c := range prices {187 var j int188 for price := range c {189 for k, p := range testCase.Expected[d] {190 if j != k {191 continue192 }193 if !reflect.DeepEqual(price, p) {194 t.Fatal("case", i+1, "expected", p, "got", price)195 }196 }197 j++198 }199 }200 }201}...

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 xlsx, err := excelize.OpenFile("Book1.xlsx")4 if err != nil {5 fmt.Println(err)6 }7 cell := xlsx.GetCellValue("Sheet1", "B2")8 fmt.Println(cell)9 index := xlsx.GetSheetIndex("Sheet1")10 fmt.Println(index)11 rows := xlsx.GetRows("Sheet1")12 for _, row := range rows {13 for _, colCell := range row {14 fmt.Print(colCell, "\t")15 }16 fmt.Println()17 }18}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 xlFile, err := xlsx.OpenFile(excelFileName)4 if err != nil {5 fmt.Println(err)6 }7 for _, sheet := range xlFile.Sheets {8 for _, row := range sheet.Rows {9 for _, cell := range row.Cells {10 fmt.Printf("%s\t", cell.String())11 }12 fmt.Printf("\n")13 }14 }15}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 f := excelize.NewFile()4 index := f.NewSheet("Sheet2")5 f.SetCellValue("Sheet1", "A2", "Hello world.")6 f.SetCellValue("Sheet2", "B2", 100)7 f.SetActiveSheet(index)8 if rows, err := f.GetRows("Sheet2"); err == nil {9 for _, row := range rows {10 for _, colCell := range row {11 fmt.Print(colCell, "\t")12 }13 fmt.Println()14 }15 }

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 xlFile, err := xlsx.OpenFile("test.xlsx")4 if err != nil {5 fmt.Println("Error in opening file")6 }7 for _, sheet := range xlFile.Sheets {8 for _, row := range sheet.Rows {9 for _, cell := range row.Cells {10 text := cell.String()11 fmt.Printf("%s\n", text)12 }13 }14 }15}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 xlFile, err := xlsx.OpenFile(excelFileName)4 if err != nil {5 log.Fatal(err)6 }7 for _, sheet := range xlFile.Sheets {8 for _, row := range sheet.Rows {9 for _, cell := range row.Cells {10 fmt.Print(cell, "\t")11 }12 fmt.Println()13 }14 }15}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 f, err := os.Open("1.xlsx")4 if err != nil {5 log.Fatal(err)6 }7 xlFile, err := xlsx.OpenBinary(f)8 if err != nil {9 log.Fatal(err)10 }11 text := cell.String()12 fmt.Println(text)13}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2type Person struct {3}4func main() {5 csvFile, err := os.OpenFile("data.csv", os.O_RDWR|os.O_CREATE, os.ModePerm)6 if err != nil {7 log.Fatal(err)8 }9 defer csvFile.Close()10 people := []*Person{}11 log.Fatal(err)12 }13 fmt.Println(people)14 newFile, err := os.Create("new_data.csv")15 if err != nil {16 log.Fatal(err)17 }18 defer newFile.Close()19 log.Fatal(err)20 }21}

Full Screen

Full Screen

NewConfig

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 records, err := r.ReadAll()4 if err != nil {5 fmt.Println(err)6 }7 for _, record := range records {8 fmt.Println(record)9 }10}

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful