Skip to content

Commit

Permalink
fix: if one source fails other should continue still
Browse files Browse the repository at this point in the history
  • Loading branch information
bl4ko committed Mar 2, 2024
1 parent 8ad3e78 commit a69c706
Show file tree
Hide file tree
Showing 7 changed files with 2,035 additions and 2,206 deletions.
56 changes: 29 additions & 27 deletions cmd/netbox-ssot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func main() {
// Create our main context
mainCtx := context.Background()
mainCtx = context.WithValue(mainCtx, constants.CtxSourceKey, "main")
mainCtx, cancel := context.WithCancel(mainCtx)

// Initialize Logger
ssotLogger, err := logger.New(config.Logger.Dest, config.Logger.Level)
Expand Down Expand Up @@ -58,6 +57,8 @@ func main() {

// Variable to store if the run was successful. If it wasn't we don't remove orphans.
successfullRun := true
// Variable to store failed sources
encounteredErrors := map[string]bool{}

// Go through all sources and sync data
var wg sync.WaitGroup
Expand All @@ -76,31 +77,32 @@ func main() {
// Run each source in parallel
go func(sourceCtx context.Context, source common.Source) {
defer wg.Done()
select {
case <-sourceCtx.Done():
ssotLogger.Infof(sourceCtx, "Signal received closing source")
default:
// Source initialization
ssotLogger.Info(sourceCtx, "Initializing source")
err = source.Init()
if err != nil {
ssotLogger.Error(sourceCtx, err)
successfullRun = false
cancel()
return
}
ssotLogger.Infof(sourceCtx, "Successfully initialized source %s", constants.CheckMark)
sourceName, ok := sourceCtx.Value(constants.CtxSourceKey).(string)
if !ok {
ssotLogger.Errorf(sourceCtx, "source ctx value is not set")
return
}
// Source initialization
ssotLogger.Info(sourceCtx, "Initializing source")
err = source.Init()
if err != nil {
ssotLogger.Error(sourceCtx, err)
successfullRun = false
encounteredErrors[sourceName] = true
return
}
ssotLogger.Infof(sourceCtx, "Successfully initialized source %s", constants.CheckMark)

// Source synchronization
ssotLogger.Info(sourceCtx, "Syncing source...")
err = source.Sync(netboxInventory)
if err != nil {
ssotLogger.Error(sourceCtx, err)
cancel()
return
}
ssotLogger.Infof(sourceCtx, "Source synced successfully %s", constants.CheckMark)
// Source synchronization
ssotLogger.Info(sourceCtx, "Syncing source...")
err = source.Sync(netboxInventory)
if err != nil {
successfullRun = false
ssotLogger.Error(sourceCtx, err)
encounteredErrors[sourceName] = true
return
}
ssotLogger.Infof(sourceCtx, "Source synced successfully %s", constants.CheckMark)
}(sourceCtx, source)
}
wg.Wait()
Expand All @@ -118,14 +120,14 @@ func main() {
ssotLogger.Info(mainCtx, "Skipping removing orphaned objects...")
}

// End the context if it hasn't been yet
cancel()
duration := time.Since(startTime)
minutes := int(duration.Minutes())
seconds := int((duration - time.Duration(minutes)*time.Minute).Seconds())
if successfullRun {
ssotLogger.Infof(mainCtx, "%s Syncing took %d min %d sec in total", constants.Rocket, minutes, seconds)
} else {
ssotLogger.Fatalf("%s syncing was unsuccessful", constants.WarningSign)
for source := range encounteredErrors {
ssotLogger.Infof(mainCtx, "%s syncing of source %s failed", constants.WarningSign, source)
}
}
}
100 changes: 45 additions & 55 deletions internal/source/dnac/dnac.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,71 +39,61 @@ type DnacSource struct {
}

func (ds *DnacSource) Init() error {
select {
case <-ds.Ctx.Done():
return fmt.Errorf("goroutine ended by context")
default:
dnacURL := fmt.Sprintf("%s://%s:%d", ds.Config.SourceConfig.HTTPScheme, ds.Config.SourceConfig.Hostname, ds.Config.SourceConfig.Port)
Client, err := dnac.NewClientWithOptions(dnacURL, ds.SourceConfig.Username, ds.SourceConfig.Password, "false", strconv.FormatBool(ds.SourceConfig.ValidateCert), nil)
if err != nil {
return fmt.Errorf("creating dnac client: %s", err)
}
dnacURL := fmt.Sprintf("%s://%s:%d", ds.Config.SourceConfig.HTTPScheme, ds.Config.SourceConfig.Hostname, ds.Config.SourceConfig.Port)
Client, err := dnac.NewClientWithOptions(dnacURL, ds.SourceConfig.Username, ds.SourceConfig.Password, "false", strconv.FormatBool(ds.SourceConfig.ValidateCert), nil)
if err != nil {
return fmt.Errorf("creating dnac client: %s", err)
}

// Initialize regex relations for this source
ds.VlanGroupRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.VlanGroupRelations)
ds.Logger.Debugf(ds.Ctx, "VlanGroupRelations: %s", ds.VlanGroupRelations)
ds.VlanTenantRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.VlanTenantRelations)
ds.Logger.Debugf(ds.Ctx, "VlanTenantRelations: %s", ds.VlanTenantRelations)
ds.HostTenantRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.HostTenantRelations)
ds.Logger.Debugf(ds.Ctx, "HostTenantRelations: %s", ds.HostTenantRelations)
// Initialize regex relations for this source
ds.VlanGroupRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.VlanGroupRelations)
ds.Logger.Debugf(ds.Ctx, "VlanGroupRelations: %s", ds.VlanGroupRelations)
ds.VlanTenantRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.VlanTenantRelations)
ds.Logger.Debugf(ds.Ctx, "VlanTenantRelations: %s", ds.VlanTenantRelations)
ds.HostTenantRelations = utils.ConvertStringsToRegexPairs(ds.SourceConfig.HostTenantRelations)
ds.Logger.Debugf(ds.Ctx, "HostTenantRelations: %s", ds.HostTenantRelations)

// Initialize items from vsphere API to local storage
initFunctions := []func(*dnac.Client) error{
ds.InitSites,
ds.InitMemberships,
ds.InitDevices,
ds.InitInterfaces,
}
// Initialize items from vsphere API to local storage
initFunctions := []func(*dnac.Client) error{
ds.InitSites,
ds.InitMemberships,
ds.InitDevices,
ds.InitInterfaces,
}

for _, initFunc := range initFunctions {
startTime := time.Now()
if err := initFunc(Client); err != nil {
return fmt.Errorf("dnac initialization failure: %v", err)
}
duration := time.Since(startTime)
ds.Logger.Infof(ds.Ctx, "Successfully initialized %s in %f seconds", utils.ExtractFunctionName(initFunc), duration.Seconds())
for _, initFunc := range initFunctions {
startTime := time.Now()
if err := initFunc(Client); err != nil {
return fmt.Errorf("dnac initialization failure: %v", err)
}
return nil
duration := time.Since(startTime)
ds.Logger.Infof(ds.Ctx, "Successfully initialized %s in %f seconds", utils.ExtractFunctionName(initFunc), duration.Seconds())
}
return nil
}

func (ds *DnacSource) Sync(nbi *inventory.NetboxInventory) error {
select {
case <-ds.Ctx.Done():
return fmt.Errorf("goroutine ended by context")
default:
// initialize variables, that are shared between sync functions
ds.VID2nbVlan = make(map[int]*objects.Vlan)
ds.SiteID2nbSite = make(map[string]*objects.Site)
ds.DeviceID2nbDevice = make(map[string]*objects.Device)
ds.InterfaceID2nbInterface = make(map[string]*objects.Interface)
// initialize variables, that are shared between sync functions
ds.VID2nbVlan = make(map[int]*objects.Vlan)
ds.SiteID2nbSite = make(map[string]*objects.Site)
ds.DeviceID2nbDevice = make(map[string]*objects.Device)
ds.InterfaceID2nbInterface = make(map[string]*objects.Interface)

syncFunctions := []func(*inventory.NetboxInventory) error{
ds.SyncSites,
ds.SyncVlans,
ds.SyncDevices,
ds.SyncDeviceInterfaces,
}
syncFunctions := []func(*inventory.NetboxInventory) error{
ds.SyncSites,
ds.SyncVlans,
ds.SyncDevices,
ds.SyncDeviceInterfaces,
}

for _, syncFunc := range syncFunctions {
startTime := time.Now()
err := syncFunc(nbi)
if err != nil {
return err
}
duration := time.Since(startTime)
ds.Logger.Infof(ds.Ctx, "Successfully synced %s in %f seconds", utils.ExtractFunctionName(syncFunc), duration.Seconds())
for _, syncFunc := range syncFunctions {
startTime := time.Now()
err := syncFunc(nbi)
if err != nil {
return err
}
return nil
duration := time.Since(startTime)
ds.Logger.Infof(ds.Ctx, "Successfully synced %s in %f seconds", utils.ExtractFunctionName(syncFunc), duration.Seconds())
}
return nil
}
Loading

0 comments on commit a69c706

Please sign in to comment.