This content originally appeared on DEV Community and was authored by Minwook Je
Kubernetes Native Edge Computing Framework
- Kubernetes Native API at Edge.
- Optimized usage of resource at the edge. Memory footprint down to ~70MB.
- Easy communication between application and devices for IOT and Industrial Internet.
Architecture
1. Architecture:: Common Components
1.1.Beehive
Beehive is an in‑process messaging bus (go-channels) that each CloudCore/EdgeCore process initializes once. It routes within a single process; HA coordination across CloudCore instances relies on ObjectSync/ClusterObjectSync CRDs (shared via K8s) and CloudHub's per-node session ownership. Group broadcasts are local to the process.
- Per‑process: One Beehive per CloudCore/EdgeCore process (no cross‑process bus).
- Routing scope: Modules talk via Beehive only within the same process.
- HA sync: Cross‑instance consistency uses CRDs (ObjectSync/ClusterObjectSync) to record "last synced resourceVersion".
- Delivery ownership: The CloudCore instance that owns the node's active CloudHub session actually delivers messages.
- Downstream queues: CloudHub keeps per‑node NodeMessagePool (ACK/No‑ACK queues/stores).
- ACK path: Edge ACK advances CRD status so all instances see the same sync point.
Per‑process Beehive initialization
// file: staging/src/github.com/kubeedge/beehive/pkg/core/core.go
func StartModules() {
// initialize channel-mode beehive for this process
beehiveContext.InitContext([]string{common.MsgCtxTypeChannel})
for name, module := range GetModules() {
beehiveContext.AddModule(&common.ModuleInfo{ModuleName: name, ModuleType: module.contextType})
beehiveContext.AddModuleGroup(name, module.module.Group())
go localModuleKeeper(module)
}
}
Intra‑process routing via Beehive
// file: cloud/pkg/common/messagelayer/context.go
func (cml *ContextMessageLayer) Send(message model.Message) error {
module := cml.SendModuleName
if len(cml.SendRouterModuleName) != 0 && isRouterMsg(message) {
module = cml.SendRouterModuleName
}
beehiveContext.Send(module, message) // in-process only
return nil
}
SyncController: publish updates when K8s RV > CRD RV
// file: cloud/pkg/synccontroller/objectsync.go
if CompareResourceVersion(objectRV, sync.Status.ObjectResourceVersion) > 0 {
msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType,
sync.Spec.ObjectName, model.UpdateOperation, obj)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg) // to CloudHub in same process
}
CloudHub dispatcher: enqueue based on CRD state; bootstrap CRD on first sight
// file: cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
// namespaced resource path
objectSync := &v1alpha1.ObjectSync{ /* APIVersion, Kind, Name */ }
created, err := md.reliableClient.ReliablesyncsV1alpha1().
ObjectSyncs(resourceNamespace).Create(ctx, objectSync, metav1.CreateOptions{})
created.Status.ObjectResourceVersion = "0"
_, _ = md.reliableClient.ReliablesyncsV1alpha1().
ObjectSyncs(resourceNamespace).UpdateStatus(ctx, created, metav1.UpdateOptions{})
// non-namespaced (cluster) resource path
cos := &v1alpha1.ClusterObjectSync{ /* APIVersion, Kind, Name */ }
cos, _ = md.reliableClient.ReliablesyncsV1alpha1().
ClusterObjectSyncs().Create(ctx, cos, metav1.CreateOptions{})
cos.Status.ObjectResourceVersion = "0"
_, _ = md.reliableClient.ReliablesyncsV1alpha1().
ClusterObjectSyncs().UpdateStatus(ctx, cos, metav1.UpdateOptions{})
Edge ACK advances CRDs (shared sync point)
// file: cloud/pkg/cloudhub/session/node_session.go
// namespaced
objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
_, _ = ns.reliableClient.ReliablesyncsV1alpha1().
ObjectSyncs(resourceNamespace).UpdateStatus(ctx, objectSync, metav1.UpdateOptions{})
// cluster-scoped
clusterObjectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
_, _ = ns.reliableClient.ReliablesyncsV1alpha1().
ClusterObjectSyncs().UpdateStatus(ctx, clusterObjectSync, metav1.UpdateOptions{})
Session ownership and per‑node message pools (only in CloudHub)
// file: cloud/pkg/cloudhub/handler/message_handler.go
nodeMessagePool := common.InitNodeMessagePool(nodeID)
mh.MessageDispatcher.AddNodeMessagePool(nodeID, nodeMessagePool)
nodeSession := session.NewNodeSession(nodeID, projectID, connection, keepaliveInterval,
nodeMessagePool, mh.reliableClient)
mh.SessionManager.AddSession(nodeSession)
// on disconnect:
mh.MessageDispatcher.DeleteNodeMessagePool(nodeInfo.NodeID, nodeMessagePool)
mh.SessionManager.DeleteSession(nodeSession)
Downstream delivery to local Beehive groups (within owning instance)
// file: cloud/pkg/cloudhub/dispatcher/message_dispatcher.go
func (md *messageDispatcher) Publish(msg *beehivemodel.Message) error {
switch msg.Router.Source {
case metaserver.MetaServerSource:
beehivecontext.Send(modules.DynamicControllerModuleName, *msg)
case model.ResTwin:
beehivecontext.SendToGroup(modules.DeviceControllerModuleGroup, *msg)
default:
beehivecontext.SendToGroup(modules.EdgeControllerGroupName, *msg)
}
return nil
}
NodeMessagePool structure (CloudHub per‑node queues/stores)
// file: cloud/pkg/cloudhub/common/message_pool.go
type NodeMessagePool struct {
AckMessageStore cache.Store
AckMessageQueue workqueue.RateLimitingInterface
NoAckMessageStore cache.Store
NoAckMessageQueue workqueue.RateLimitingInterface
}
2. Architecture:: Cloud Components
3. Architecture:: Edge Components
This content originally appeared on DEV Community and was authored by Minwook Je

Minwook Je | Sciencx (2025-09-08T07:10:11+00:00) KubeEdge. Retrieved from https://www.scien.cx/2025/09/08/kubeedge/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.