|
| 1 | +/* |
| 2 | + * Copyright 2019 the go-netty project |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * https://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package netty |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "os" |
| 22 | + "os/signal" |
| 23 | + |
| 24 | + "github.com/go-netty/go-netty/transport" |
| 25 | + "github.com/go-netty/go-netty/utils" |
| 26 | +) |
| 27 | + |
| 28 | +type Bootstrap interface { |
| 29 | + Context() context.Context |
| 30 | + WithContext(ctx context.Context) Bootstrap |
| 31 | + ChannelExecutor(executorFactory ChannelExecutorFactory) Bootstrap |
| 32 | + ChannelId(channelIdFactory ChannelIdFactory) Bootstrap |
| 33 | + Pipeline(pipelineFactory PipelineFactory) Bootstrap |
| 34 | + Channel(channelFactory ChannelFactory) Bootstrap |
| 35 | + Transport(factory TransportFactory) Bootstrap |
| 36 | + ChildInitializer(initializer ChannelInitializer) Bootstrap |
| 37 | + ClientInitializer(initializer ChannelInitializer) Bootstrap |
| 38 | + Listen(url string, option ...transport.Option) Bootstrap |
| 39 | + Connect(url string, attachment Attachment, option ...transport.Option) (Channel, error) |
| 40 | + RunForever(signals ...os.Signal) |
| 41 | + Run() Bootstrap |
| 42 | + Stop() Bootstrap |
| 43 | +} |
| 44 | + |
| 45 | +func NewBootstrap() Bootstrap { |
| 46 | + return new(bootstrap). |
| 47 | + WithContext(context.Background()). |
| 48 | + ChannelId(SequenceId()). |
| 49 | + Pipeline(NewPipeline()). |
| 50 | + Channel(NewChannel(128)) |
| 51 | +} |
| 52 | + |
| 53 | +type bootstrap struct { |
| 54 | + bootstrapOptions |
| 55 | + listenOptions []transport.Option |
| 56 | + acceptor transport.Acceptor |
| 57 | +} |
| 58 | + |
| 59 | +func (b *bootstrap) WithContext(ctx context.Context) Bootstrap { |
| 60 | + b.bootstrapCtx, b.bootstrapCancel = context.WithCancel(ctx) |
| 61 | + return b |
| 62 | +} |
| 63 | + |
| 64 | +func (b *bootstrap) Context() context.Context { |
| 65 | + return b.bootstrapCtx |
| 66 | +} |
| 67 | + |
| 68 | +func (b *bootstrap) ChannelExecutor(executor ChannelExecutorFactory) Bootstrap { |
| 69 | + b.executorFactory = executor |
| 70 | + return b |
| 71 | +} |
| 72 | + |
| 73 | +func (b *bootstrap) ChannelId(channelIdFactory ChannelIdFactory) Bootstrap { |
| 74 | + b.channelIdFactory = channelIdFactory |
| 75 | + return b |
| 76 | +} |
| 77 | + |
| 78 | +func (b *bootstrap) Pipeline(pipelineFactory PipelineFactory) Bootstrap { |
| 79 | + b.pipelineFactory = pipelineFactory |
| 80 | + return b |
| 81 | +} |
| 82 | + |
| 83 | +func (b *bootstrap) Channel(channelFactory ChannelFactory) Bootstrap { |
| 84 | + b.channelFactory = channelFactory |
| 85 | + return b |
| 86 | +} |
| 87 | + |
| 88 | +func (b *bootstrap) Transport(factory TransportFactory) Bootstrap { |
| 89 | + b.transportFactory = factory |
| 90 | + return b |
| 91 | +} |
| 92 | + |
| 93 | +func (b *bootstrap) ChildInitializer(initializer ChannelInitializer) Bootstrap { |
| 94 | + b.childInitializer = initializer |
| 95 | + return b |
| 96 | +} |
| 97 | + |
| 98 | +func (b *bootstrap) ClientInitializer(initializer ChannelInitializer) Bootstrap { |
| 99 | + b.clientInitializer = initializer |
| 100 | + return b |
| 101 | +} |
| 102 | + |
| 103 | +func (b *bootstrap) serveChannel(channelExecutor ChannelExecutor, channel Channel, childChannel bool) { |
| 104 | + |
| 105 | + // 初始化流水线,注册用户定义事件处理器 |
| 106 | + if childChannel { |
| 107 | + b.childInitializer(channel) |
| 108 | + } else { |
| 109 | + b.clientInitializer(channel) |
| 110 | + } |
| 111 | + |
| 112 | + // 需要插入Executor |
| 113 | + if channelExecutor != nil { |
| 114 | + |
| 115 | + // 找到最后一个解码器的位置 |
| 116 | + position := channel.Pipeline().LastIndexOf(func(handler Handler) bool { |
| 117 | + _, ok := handler.(CodecHandler) |
| 118 | + return ok |
| 119 | + }) |
| 120 | + |
| 121 | + // 必须要有Codec |
| 122 | + utils.AssertIf(-1 == position, "missing codec.") |
| 123 | + |
| 124 | + // 插入到解码器后面 |
| 125 | + channel.Pipeline().AddHandler(position, channelExecutor) |
| 126 | + } |
| 127 | + |
| 128 | + // 开始服务 |
| 129 | + channel.Pipeline().serveChannel(channel) |
| 130 | +} |
| 131 | + |
| 132 | +func (b *bootstrap) serveTransport(transport transport.Transport, attachment Attachment, childChannel bool) Channel { |
| 133 | + |
| 134 | + // 创建一个流水线, 用于定义事件处理流程 |
| 135 | + pipeline := b.pipelineFactory() |
| 136 | + |
| 137 | + // 生成ChanelId |
| 138 | + cid := b.channelIdFactory() |
| 139 | + |
| 140 | + // 创建一个Channel用于读写数据 |
| 141 | + channel := b.channelFactory(cid, b.bootstrapCtx, pipeline, transport) |
| 142 | + |
| 143 | + // 挂载附件 |
| 144 | + if nil != attachment { |
| 145 | + channel.SetAttachment(attachment) |
| 146 | + } |
| 147 | + |
| 148 | + // Channel Executor |
| 149 | + var chExecutor ChannelExecutor |
| 150 | + if nil != b.executorFactory { |
| 151 | + chExecutor = b.executorFactory(channel.Context()) |
| 152 | + } |
| 153 | + |
| 154 | + b.serveChannel(chExecutor, channel, childChannel) |
| 155 | + return channel |
| 156 | +} |
| 157 | + |
| 158 | +func (b *bootstrap) createListener() error { |
| 159 | + |
| 160 | + // 不需要创建 |
| 161 | + if len(b.listenOptions) <= 0 { |
| 162 | + return nil |
| 163 | + } |
| 164 | + |
| 165 | + options, err := transport.ParseOptions(b.listenOptions...) |
| 166 | + if nil != err { |
| 167 | + return err |
| 168 | + } |
| 169 | + |
| 170 | + // 监听服务 |
| 171 | + l, err := b.transportFactory.Listen(options) |
| 172 | + if nil != err { |
| 173 | + return err |
| 174 | + } |
| 175 | + |
| 176 | + b.acceptor = l |
| 177 | + return nil |
| 178 | +} |
| 179 | + |
| 180 | +func (b *bootstrap) startListener() { |
| 181 | + |
| 182 | + if nil == b.acceptor { |
| 183 | + return |
| 184 | + } |
| 185 | + |
| 186 | + go func() { |
| 187 | + |
| 188 | + for { |
| 189 | + // 接受一个连接 |
| 190 | + t, err := b.acceptor.Accept() |
| 191 | + if nil != err { |
| 192 | + break |
| 193 | + } |
| 194 | + |
| 195 | + // 开始服务 |
| 196 | + b.serveTransport(t, nil, true) |
| 197 | + } |
| 198 | + }() |
| 199 | +} |
| 200 | + |
| 201 | +func (b *bootstrap) stopListener() { |
| 202 | + if b.acceptor != nil { |
| 203 | + _ = b.acceptor.Close() |
| 204 | + } |
| 205 | +} |
| 206 | + |
| 207 | +func (b *bootstrap) Connect(url string, attachment Attachment, option ...transport.Option) (Channel, error) { |
| 208 | + |
| 209 | + transOptions := []transport.Option{ |
| 210 | + // remote address. |
| 211 | + transport.WithAddress(url), |
| 212 | + // context |
| 213 | + transport.WithContext(b.Context()), |
| 214 | + } |
| 215 | + transOptions = append(transOptions, option...) |
| 216 | + |
| 217 | + options, err := transport.ParseOptions(transOptions...) |
| 218 | + if nil != err { |
| 219 | + return nil, err |
| 220 | + } |
| 221 | + |
| 222 | + // 连接对端 |
| 223 | + t, err := b.transportFactory.Connect(options) |
| 224 | + if nil != err { |
| 225 | + return nil, err |
| 226 | + } |
| 227 | + |
| 228 | + // 开始服务 |
| 229 | + return b.serveTransport(t, attachment, false), nil |
| 230 | +} |
| 231 | + |
| 232 | +func (b *bootstrap) Listen(url string, option ...transport.Option) Bootstrap { |
| 233 | + b.listenOptions = []transport.Option{ |
| 234 | + // remote address |
| 235 | + transport.WithAddress(url), |
| 236 | + // context. |
| 237 | + transport.WithContext(b.Context()), |
| 238 | + } |
| 239 | + b.listenOptions = append(b.listenOptions, option...) |
| 240 | + return b |
| 241 | +} |
| 242 | + |
| 243 | +func (b *bootstrap) Stop() Bootstrap { |
| 244 | + b.bootstrapCancel() |
| 245 | + return b |
| 246 | +} |
| 247 | + |
| 248 | +func (b *bootstrap) Run() Bootstrap { |
| 249 | + go b.RunForever() |
| 250 | + return b |
| 251 | +} |
| 252 | + |
| 253 | +func (b *bootstrap) RunForever(signals ...os.Signal) { |
| 254 | + |
| 255 | + // 初始化listener |
| 256 | + err := b.createListener() |
| 257 | + utils.Assert(err) |
| 258 | + |
| 259 | + // 启动listener |
| 260 | + b.startListener() |
| 261 | + |
| 262 | + // 关闭listener |
| 263 | + defer b.stopListener() |
| 264 | + |
| 265 | + // 停止服务 |
| 266 | + defer b.Stop() |
| 267 | + |
| 268 | + // 监听信号 |
| 269 | + var sigChan = make(chan os.Signal, 1) |
| 270 | + |
| 271 | + if len(signals) > 0 { |
| 272 | + signal.Notify(sigChan, signals...) |
| 273 | + } |
| 274 | + |
| 275 | + // 等待收到信号或者外部主动停止 |
| 276 | + select { |
| 277 | + case <-b.bootstrapCtx.Done(): |
| 278 | + case <-sigChan: |
| 279 | + } |
| 280 | +} |
0 commit comments