Skip to content

Commit

Permalink
pillar: implement NTP sources info messages to the controller
Browse files Browse the repository at this point in the history
This implements NTP sources eve-api/ptoto/info/ntpsources.proto
info messages, which periodically (at least once per 10 min) are
sent to the controller.

NTP sources data is fetched from the chronyd running on EVE over
the unix domain socket with the help of vendor API from the
github.com/facebook/time/ntp/chrony package. Currently this API
supports only monitoring (RO), not control packages. In the future
this can be extended, so `nim` can have full control over the
chronyd and update its servers and pools.

Signed-off-by: Roman Penyaev <r.peniaev@gmail.com>
  • Loading branch information
rouming committed Jun 12, 2024
1 parent dc63d1e commit 4582cf4
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/CONFIG-PROPERTIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
| timer.metric.diskscan.interval | integer in seconds | 300 | how frequently device should scan the disk for metrics |
| timer.location.cloud.interval | integer in seconds | 1 hour | how frequently device reports geographic location information to controller |
| timer.location.app.interval | integer in seconds | 20 | how frequently device reports geographic location information to applications (to local profile server and to other apps via meta-data server) |
| timer.ntpsources.interval | integer in seconds | 10 minutes | how frequently device reports information about NTP sources to which EVE has established a connection for the NTP synchronization |
| timer.send.timeout | timer in seconds | 120 | time for each http/send |
| timer.dial.timeout | timer in seconds | 10 | maximum time allowed to establish connection |
| timer.reboot.no.network | integer in seconds | 7 days | reboot after no cloud connectivity |
Expand Down
1 change: 1 addition & 0 deletions pkg/pillar/cmd/zedagent/handleconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type getconfigContext struct {
locationCloudTickerHandle interface{}
locationAppTickerHandle interface{}
localProfileTickerHandle interface{}
ntpSourcesTickerHandle interface{}
pubDevicePortConfig pubsub.Publication
pubPhysicalIOAdapters pubsub.Publication
devicePortConfig types.DevicePortConfig
Expand Down
303 changes: 303 additions & 0 deletions pkg/pillar/cmd/zedagent/handlentp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// Copyright (c) 2024 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

package zedagent

import (
"bytes"
"fmt"
"net"
"os"
"path"
"time"

"github.com/golang/protobuf/ptypes"

chrony "github.com/facebook/time/ntp/chrony"
"github.com/lf-edge/eve-api/go/info"
"github.com/lf-edge/eve/pkg/pillar/flextimer"
"github.com/lf-edge/eve/pkg/pillar/types"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
// Path to unix chrony socket
unixChronydPath = "/run/chrony/chronyd.sock"
)

// Run a periodic post of the NTP sources information.
func ntpSourcesTimerTask(ctx *zedagentContext, handleChannel chan interface{},
triggerNTPSourcesInfo chan destinationBitset) {

// Ticker for periodic publishing NTP sources to the controller.
globalInterval := ctx.globalConfig.GlobalValueInt(types.NTPSourcesInterval)
interval := time.Duration(globalInterval) * time.Second
max := float64(interval)
min := max * 0.3
ticker := flextimer.NewRangeTicker(time.Duration(min), time.Duration(max))

// Return handles to the caller.
handleChannel <- ticker

wdName := agentName + "-ntp"

// Run a periodic timer so we always update StillRunning
stillRunning := time.NewTicker(25 * time.Second)
ctx.ps.StillRunning(wdName, warningTime, errorTime)
ctx.ps.RegisterFileWatchdog(wdName)

for {
select {
case <-ticker.C:
publishNTPSources(ctx, wdName, ControllerDest)
case dest := <-triggerNTPSourcesInfo:
publishNTPSources(ctx, wdName, dest)
case <-stillRunning.C:
}
ctx.ps.StillRunning(wdName, warningTime, errorTime)
}
}

// Called when globalConfig changes.
// Assumes that the caller has verifier that the interval has changed.
func updateNTPSourcesTimer(ctx *getconfigContext, globalInterval uint32) {
if ctx.ntpSourcesTickerHandle == nil {
log.Warnf("updateNTPSourcesTimer: ticker is still nil")
return
}
interval := time.Duration(globalInterval) * time.Second
log.Functionf("updateNTPSourcesTimer: interval change to %v", interval)
max := float64(interval)
min := max * 0.3
flextimer.UpdateRangeTicker(ctx.ntpSourcesTickerHandle,
time.Duration(min), time.Duration(max))
// Force an immediate timeout since timer could have decreased.
flextimer.TickNow(ctx.ntpSourcesTickerHandle)
}

func publishNTPSources(ctx *zedagentContext, wdName string,
dest destinationBitset) {
info := getNTPSourcesInfo(ctx)
if info == nil {
// Not available.
return
}
start := time.Now()
publishNTPSourcesToDest(ctx, info, dest)
ctx.ps.CheckMaxTimeTopic(wdName, "publishNTPSources", start,
warningTime, errorTime)
}

func publishNTPSourcesToDest(ctx *zedagentContext,
infoNTPSources *info.ZInfoNTPSources, dest destinationBitset) {

// TODO: we don't support LPS
if (dest &^ LPSDest) == 0 {
return
}
infoMsg := &info.ZInfoMsg{
Ztype: info.ZInfoTypes_ZiNTPSources,
DevId: devUUID.String(),
InfoContent: &info.ZInfoMsg_NtpSources{
NtpSources: infoNTPSources,
},
AtTimeStamp: ptypes.TimestampNow(),
}

log.Functionf("publishNTPSourcesToDest: sending %v", infoMsg)
data, err := proto.Marshal(infoMsg)
if err != nil {
log.Fatal("publishNTPSourcesToController: proto marshaling error: ", err)
}
buf := bytes.NewBuffer(data)
if buf == nil {
log.Fatal("malloc error")
}
size := int64(proto.Size(infoMsg))

const bailOnHTTPErr = false
const withNetTrace = false
key := "ntpsources:" + devUUID.String()

// Even for the controller destination we can't stall the queue on error,
// because this is recurring call, so set @forcePeriodic to true
forcePeriodic := true
queueInfoToDest(ctx, dest, key, buf, size, bailOnHTTPErr, withNetTrace,
forcePeriodic, info.ZInfoTypes_ZiNTPSources)
}

// createNTPSource() returns `info.NTPSource`. The code is based on the
// https://github.com/facebook/time/blob/main/cmd/ntpcheck/checker
func createNTPSource(s *chrony.ReplySourceData,
p *chrony.ReplyNTPData,
n *chrony.ReplyNTPSourceName) (*info.NTPSource, error) {

// Clear auth and interleaved flag
flash := s.Flags & chrony.NTPFlagsTests
// Don't report all flashers if peer is unreachable
if flash > 0 {
flash ^= chrony.NTPFlagsTests
}
ntpSource := info.NTPSource{
Authenticated: (s.Flags & chrony.NTPFlagAuthenticated) != 0,
Reachable: s.Reachability == 255, // all 8 attempts
Reachability: uint32(s.Reachability),
// We have to advance on 1 due to the UNSPECIFIED enum in protobuf
Mode: info.NTPSourceMode(s.Mode + 1),
// We have to advance on 1 due to the UNSPECIFIED enum in protobuf
State: info.NTPSourceState(s.State + 1),
Flags: uint32(flash),
// sourceData offset and NTPData offset sign has opposite meaning
Offset: -1 * s.OrigLatestMeas,
Poll: int32(s.Poll),
Stratum: uint32(s.Stratum),
// Address of the NTP peer, so destination
DstAddr: s.IPAddr.String(),
}
if ntpSource.Mode > info.NTPSourceMode_NTP_SOURCE_MODE_REF {
ntpSource.Mode = info.NTPSourceMode_NTP_SOURCE_MODE_UNSPECIFIED
}
if ntpSource.State > info.NTPSourceState_NTP_SOURCE_STATE_OUTLIER {
ntpSource.State = info.NTPSourceState_NTP_SOURCE_STATE_UNSPECIFIED
}

// Populate data from NTPData struct
if p != nil {
refID := chrony.RefidAsHEX(p.RefID)
// Only stratum 1 servers can have GPS or something else as string refID
if p.Stratum == 1 {
refIDStr := chrony.RefidToString(p.RefID)
if len(refIDStr) > 0 {
refID = refIDStr
}
}
ntpSource.Leap = uint32(p.Leap)
ntpSource.Poll = int32(p.Poll)
// Local address the connection to NTP peer, so source
ntpSource.SrcAddr = p.LocalAddr.String()
ntpSource.RefTime = timestamppb.New(p.RefTime)
ntpSource.Offset = p.Offset
ntpSource.Dispersion = p.PeerDispersion
// Missing that info
ntpSource.SrcPort = 0
ntpSource.DstPort = uint32(p.RemotePort)
ntpSource.RefId = refID
ntpSource.Jitter = p.PeerDispersion
ntpSource.RootDelay = p.RootDelay
ntpSource.Precision = uint32(p.Precision)
ntpSource.Delay = p.PeerDelay
ntpSource.RootDisp = p.RootDispersion
}
if n != nil {
// This field is zero padded in chrony, so we need to trim it
ntpSource.Hostname = string(bytes.TrimRight(n.Name[:], "\x00"))
}

return &ntpSource, nil
}

type chronyConn struct {
net.Conn
local string
}

// dialUnixWithChronyd() established connection. The code is a based on the
// https://github.com/facebook/time/blob/main/cmd/ntpcheck/checker
func dialUnixWithChronyd(address string) (*chronyConn, error) {
base, _ := path.Split(address)
local := path.Join(base, fmt.Sprintf("chronyc.%d.sock", os.Getpid()))
conn, err := net.DialUnix("unixgram",
&net.UnixAddr{Name: local, Net: "unixgram"},
&net.UnixAddr{Name: address, Net: "unixgram"},
)
if err != nil {
return nil, err
}
if err := os.Chmod(local, 0600); err != nil {
conn.Close()
return nil, err
}
return &chronyConn{Conn: conn, local: local}, nil
}

// getNTPSourcesInfo() returns `info.ZInfoNTPSources`. The code is based on the
// https://github.com/facebook/time/blob/main/cmd/ntpcheck/checker
func getNTPSourcesInfo(ctx *zedagentContext) *info.ZInfoNTPSources {
conn, err := dialUnixWithChronyd(unixChronydPath)
if err != nil {
log.Errorf("getNTPSourcesInfo: can't connect to chronyd: %v", err)
return nil
}
defer func() {
conn.Close()
os.Remove(conn.local)
}()

client := chrony.Client{Sequence: 1, Connection: conn}
sourcesReq := chrony.NewSourcesPacket()
packet, err := client.Communicate(sourcesReq)
if err != nil {
log.Errorf("getNTPSourcesInfo: failed to get 'sources' response: %v", err)
return nil
}
sources, ok := packet.(*chrony.ReplySources)
if !ok {
log.Errorf("getNTPSourcesInfo: failed to convert to reply: %v", err)
return nil
}

info := info.ZInfoNTPSources{}

for i := 0; i < sources.NSources; i++ {
sourceDataReq := chrony.NewSourceDataPacket(int32(i))
packet, err = client.Communicate(sourceDataReq)
if err != nil {
log.Errorf("getNTPSourcesInfo: failed to get 'sourcedata' response for source #%d, err %v", i, err)
return nil
}
sourceData, ok := packet.(*chrony.ReplySourceData)
if !ok {
log.Errorf("getNTPSourcesInfo: got wrong 'sourcedata' response %+v", packet)
return nil
}

// get ntpdata when using a unix socket
var ntpData *chrony.ReplyNTPData
if sourceData.Mode != chrony.SourceModeRef {
ntpDataReq := chrony.NewNTPDataPacket(sourceData.IPAddr)
packet, err = client.Communicate(ntpDataReq)
if err != nil {
log.Errorf("getNTPSourcesInfo: failed to get 'ntpdata' response for source #%d", i)
return nil
}
ntpData, ok = packet.(*chrony.ReplyNTPData)
if !ok {
log.Errorf("getNTPSourcesInfo: got wrong 'ntpdata' response %+v", packet)
return nil
}
}
var ntpSourceName *chrony.ReplyNTPSourceName
if sourceData.Mode != chrony.SourceModeRef {
ntpSourceNameReq := chrony.NewNTPSourceNamePacket(sourceData.IPAddr)
packet, err = client.Communicate(ntpSourceNameReq)
if err != nil {
log.Errorf("getNTPSourcesInfo: failed to get 'sourcename' response for source #%d", i)
return nil
}
ntpSourceName, ok = packet.(*chrony.ReplyNTPSourceName)
if !ok {
log.Errorf("getNTPSourcesInfo: got wrong 'sourcename' response %+v", packet)
return nil
}
}
ntpSource, err := createNTPSource(sourceData, ntpData, ntpSourceName)
if err != nil {
log.Errorf("getNTPSourcesInfo: failed to create Peer structure from response packet for peer=%s", sourceData.IPAddr)
return nil
}
info.Sources = append(info.Sources, ntpSource)
}

return &info
}
7 changes: 7 additions & 0 deletions pkg/pillar/cmd/zedagent/parseconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,13 @@ func parseConfigItems(ctx *getconfigContext, config *zconfig.EdgeDevConfig,
ctx.zedagentCtx.gcpMaintenanceMode = newMaintenanceMode
mergeMaintenanceMode(ctx.zedagentCtx)
}
oldNTPSourcesInterval :=
oldGlobalConfig.GlobalValueInt(types.NTPSourcesInterval)
newNTPSourcesInterval :=
newGlobalConfig.GlobalValueInt(types.NTPSourcesInterval)
if oldNTPSourcesInterval != newNTPSourcesInterval {
updateNTPSourcesTimer(ctx, newNTPSourcesInterval)
}

pub := ctx.zedagentCtx.pubGlobalConfig
err := pub.Publish("global", *gcPtr)
Expand Down
18 changes: 18 additions & 0 deletions pkg/pillar/cmd/zedagent/zedagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type zedagentContext struct {
triggerDeviceInfo chan<- destinationBitset
triggerHwInfo chan<- destinationBitset
triggerLocationInfo chan<- destinationBitset
triggerNTPSourcesInfo chan<- destinationBitset
triggerObjectInfo chan<- infoForObjectKey
zbootRestarted bool // published by baseosmgr
subOnboardStatus pubsub.Subscription
Expand Down Expand Up @@ -330,11 +331,13 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
triggerDeviceInfo := make(chan destinationBitset, 1)
triggerHwInfo := make(chan destinationBitset, 1)
triggerLocationInfo := make(chan destinationBitset, 1)
triggerNTPSourcesInfo := make(chan destinationBitset, 1)
triggerObjectInfo := make(chan infoForObjectKey, 1)
zedagentCtx.flowlogQueue = flowlogQueue
zedagentCtx.triggerDeviceInfo = triggerDeviceInfo
zedagentCtx.triggerHwInfo = triggerHwInfo
zedagentCtx.triggerLocationInfo = triggerLocationInfo
zedagentCtx.triggerNTPSourcesInfo = triggerNTPSourcesInfo
zedagentCtx.triggerObjectInfo = triggerObjectInfo

// Initialize all zedagent publications.
Expand Down Expand Up @@ -491,6 +494,11 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
getconfigCtx.locationCloudTickerHandle = <-handleChannel
getconfigCtx.locationAppTickerHandle = <-handleChannel

// start the NTP sources reporting task
log.Functionf("Creating %s at %s", "ntpTimerTask", agentlog.GetMyStack())
go ntpSourcesTimerTask(zedagentCtx, handleChannel, triggerNTPSourcesInfo)
getconfigCtx.ntpSourcesTickerHandle = <-handleChannel

//trigger channel for localProfile state machine
getconfigCtx.sideController.localProfileTrigger = make(chan Notify, 1)
//process saved local profile
Expand Down Expand Up @@ -2008,6 +2016,15 @@ func triggerPublishLocationToDest(ctxPtr *zedagentContext, dest destinationBitse
ctxPtr.triggerLocationInfo <- dest
}

func triggerPublishNTPSourcesToDest(ctxPtr *zedagentContext, dest destinationBitset) {
if ctxPtr.getconfigCtx.ntpSourcesTickerHandle == nil {
// NTP sources reporting task is not yet running.
return
}
log.Function("Triggered publishNTPSources")
ctxPtr.triggerNTPSourcesInfo <- dest
}

func triggerPublishAllInfo(ctxPtr *zedagentContext, dest destinationBitset) {

log.Function("Triggered PublishAllInfo")
Expand Down Expand Up @@ -2075,6 +2092,7 @@ func triggerPublishAllInfo(ctxPtr *zedagentContext, dest destinationBitset) {
}
}
triggerPublishLocationToDest(ctxPtr, dest)
triggerPublishNTPSourcesToDest(ctxPtr, dest)
}()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/pillar/dpcmanager/dpcmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func globalConfig() types.ConfigItemValueMap {
gcp.SetGlobalValueInt(types.NetworkGeoRedoTime, 3)
gcp.SetGlobalValueInt(types.LocationCloudInterval, 10)
gcp.SetGlobalValueInt(types.LocationAppInterval, 2)
gcp.SetGlobalValueInt(types.NTPSourcesInterval, 5)
gcp.SetGlobalValueBool(types.NetDumpEnable, false)
return *gcp
}
Expand Down
Loading

0 comments on commit 4582cf4

Please sign in to comment.